Module topicnet.cooking_machine.cubes.base_cube
Expand source code
import os
from tqdm import tqdm
import warnings
from multiprocessing import Queue, Process
from artm.wrapper.exceptions import ArtmException
from .strategy import BaseStrategy
from ..models.base_model import padd_model_name
from ..routine import get_timestamp_in_str_format
NUM_MODELS_ERROR = "Failed to retrive number of trained models"
MODEL_RETRIEVE_ERROR = "Retrieved only {0} models out of {1}"
STRATEGY_RETRIEVE_ERROR = 'Failed to retrieve strategy parameters'
WARNINGS_RETRIEVE_ERROR = 'Failed to return warnings'
SCORE_ERROR_MESSAGE = "Can't find a score ''{0}''. Please add a score with that name to the model."
def check_experiment_existence(topic_model):
    """
    Checks if topic_model has experiment.
    Parameters
    ----------
    topic_model : TopicModel
        topic model
    Returns
    -------
    bool
        True if experiment exists, in other case False.
    """
    is_experiment = topic_model.experiment is not None
    return is_experiment
def retrieve_score_for_strategy(score_name=None):
    if not score_name:
        score_name = 'PerplexityScore@all'
    def last_score(model):
        try:
            return model.scores[score_name][-1]
        except KeyError:
            raise KeyError(SCORE_ERROR_MESSAGE.format(score_name))
    return last_score
# exists for multiprocessing debug
def put_to_queue(queue, puttable):
    queue.put(puttable)
# exists for multiprocessing debug
def get_from_queue_till_fail(queue,  error_message='',):
    return queue.get()
class BaseCube:
    """
    Abstract class for all cubes.
    """
    def __init__(self, num_iter, action=None, reg_search="grid",
                 strategy=None, tracked_score_function=None,
                 verbose=False, separate_thread=True):
        """
        Initialize stage.
        Checks params and update .parameters attribute.
        Parameters
        ----------
        num_iter : int
            number of iterations or method
        action : str
            stage of creation
        reg_search : str
            "grid" or "pair". "pair" for elementwise grid search in the case
            of several regularizers, "grid" for the fullgrid search in the
            case of several regularizers
        strategy : BaseStrategy
            optimization approach
        tracked_score_function : str or callable
            optimizable function for strategy
        verbose : bool
            visualization flag
        separate_thread : bool
            will train models inside a separate thread if True
        """
        self.num_iter = num_iter
        self.parameters = []
        self.action = action
        self.reg_search = reg_search
        if not strategy:
            strategy = BaseStrategy()
        self.strategy = strategy
        self.verbose = verbose
        self.separate_thread = separate_thread
        if isinstance(tracked_score_function, str):
            tracked_score_function = retrieve_score_for_strategy(tracked_score_function)
        self.tracked_score_function = tracked_score_function
    def apply(self, topic_model, one_cube_parameter, dictionary=None, model_id=None):
        """
        "apply" method changes topic_model in way that is defined by one_cube_parameter.
        Parameters
        ----------
        topic_model : TopicModel
            topic model
        one_cube_parameter : optional
            parameters of one experiment
        dictionary : dict
            dictionary so that the it can be used
            on the basis of the model (Default value = None)
        model_id : str
            id of created model if necessary (Default value = None)
        Returns
        -------
        """
        raise NotImplementedError('must be implemented in subclass')
    # TODO: из-за метода get_description на эту фунцию налагется больше требований чем тут написано
    def get_jsonable_from_parameters(self):
        """
        Transform self.parameters to something that can be downloaded as json.
        Parameters
        ----------
        Returns
        -------
        optional
            something jsonable
        """
        return self.parameters
    def _train_models(self, experiment, topic_model, dataset, search_space):
        """
        This function trains models
        """
        dataset_trainable = dataset._transform_data_for_training()
        dataset_dictionary = dataset.get_dictionary()
        returned_paths = []
        experiment_save_path = experiment.save_path
        experiment_id = experiment.experiment_id
        save_folder = os.path.join(experiment_save_path, experiment_id)
        for search_point in search_space:
            candidate_name = get_timestamp_in_str_format()
            new_model_id = padd_model_name(candidate_name)
            new_model_save_path = os.path.join(save_folder, new_model_id)
            model_index = 0
            while os.path.exists(new_model_save_path):
                model_index += 1
                new_model_id = padd_model_name("{0}{1:_>5}".format(candidate_name, model_index))
                new_model_save_path = os.path.join(save_folder, new_model_id)
            model_cube = {
                "action": self.action,
                "num_iter": self.num_iter,
                "params": repr(search_point)
            }
            try:
                # alter the model according to cube parameters
                new_model = self.apply(topic_model, search_point, dataset_dictionary, new_model_id)
                # train new model for a number of iterations (might be zero)
                new_model._fit(
                    dataset_trainable=dataset_trainable,
                    num_iterations=self.num_iter
                )
            except ArtmException as e:
                error_message = repr(e)
                raise ValueError(
                    f'Cannot alter and fit artm model with parameters {search_point}.\n'
                    "ARTM failed with following: " + error_message
                )
            # add cube description to the model history
            new_model.add_cube(model_cube)
            new_model.experiment = experiment
            new_model.save()
            assert os.path.exists(new_model.model_default_save_path)
            returned_paths.append(new_model.model_default_save_path)
            # some strategies depend on previous train results, therefore scores must be updated
            if self.tracked_score_function:
                current_score = self.tracked_score_function(new_model)
                self.strategy.update_scores(current_score)
            # else:
                # we return number of iterations as a placeholder
                # current_score = len(returned_paths)
        return returned_paths
    def _retrieve_results_from_process(self, queue, experiment):
        from ..models import DummyTopicModel
        models_num = get_from_queue_till_fail(queue, NUM_MODELS_ERROR)
        topic_models = []
        for _ in range(models_num):
            path = get_from_queue_till_fail(queue,
                                            MODEL_RETRIEVE_ERROR.format(_, models_num))
            topic_models.append(DummyTopicModel.load(path, experiment=experiment))
        strategy_parameters = get_from_queue_till_fail(queue, STRATEGY_RETRIEVE_ERROR)
        caught_warnings = get_from_queue_till_fail(queue, WARNINGS_RETRIEVE_ERROR)
        self.strategy._set_strategy_parameters(strategy_parameters)
        for (warning_message, warning_class) in caught_warnings:
            # if issubclass(warning_class, UserWarning):
            warnings.warn(warning_message)
        return topic_models
    def _train_models_and_report_results(self, queue, experiment, topic_model, dataset,
                                         search_space, search_length):
        """
        This function trains models in separate thread, saves them
        and returns all paths for save with respect to train order.
        To preserve train order model number is also returned.
        """
        with warnings.catch_warnings(record=True) as caught_warnings:
            returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
            put_to_queue(queue, len(returned_paths))
            for path in returned_paths:
                put_to_queue(queue, path)
            # to work with strategy we recover consistency by sending important parameters
            strategy_parameters = self.strategy._get_strategy_parameters(saveable_only=True)
            put_to_queue(queue, strategy_parameters)
            caught_warnings = [(warning.message, warning.category)
                               for warning in caught_warnings]
            put_to_queue(queue, caught_warnings)
    def _run_cube(self, topic_model, dataset):
        """
        Apply cube to topic_model. Get new models and fit them on batch_vectorizer.
        Return list of all trained models.
        Parameters
        ----------
        topic_model : TopicModel
        dataset : Dataset
        Returns
        -------
        TopicModel
        """
        from ..models import DummyTopicModel
        if isinstance(topic_model, DummyTopicModel):
            topic_model = topic_model.restore()
        # create log
        # TODO: будет странно работать, если бесконечный список
        parameter_description = self.get_jsonable_from_parameters()
        cube_description = {
            'action': self.action,
            'params': parameter_description
        }
        # at one level only one cube can be implemented
        if not check_experiment_existence(topic_model):
            raise ValueError("TopicModel has no experiment. You should create Experiment.")
        experiment = topic_model.experiment
        topic_model_depth_in_tree = topic_model.depth
        if topic_model_depth_in_tree < len(experiment.cubes):
            existed_cube = experiment.cubes[topic_model_depth_in_tree]
            if existed_cube['params'] != cube_description['params'] or \
                    existed_cube['action'] != cube_description['action']:
                error_message = (
                    "\nYou can not change strategy to another on this level in "
                    "this experiment.\n"
                    "If you want you can create another experiment with this "
                    "model with parameter new_experiment=True."
                    f"the existing cube is \n {existed_cube['params']} \n, "
                    f"but the proposed cube is \n {cube_description['params']} \n"
                )
                raise ValueError(error_message)
            is_new_exp_cube = False
        else:
            is_new_exp_cube = True
        # perform all experiments
        self.strategy.prepare_grid(self.parameters, self.reg_search)
        search_space = self.strategy.grid_visit_generator(self.parameters, self.reg_search)
        search_length = getattr(self.strategy, 'grid_len', None)
        if self.verbose:
            search_space = tqdm(search_space, total=search_length)
        if self.separate_thread:
            queue = Queue()
            process = Process(
                target=self._train_models_and_report_results,
                args=(queue, experiment, topic_model, dataset,
                      search_space, search_length),
                daemon=True
            )
            process.start()
            topic_models = self._retrieve_results_from_process(queue, experiment)
        else:
            returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
            topic_models = [
                DummyTopicModel.load(path, experiment=experiment)
                for path in returned_paths
            ]
        for topic_model in topic_models:
            topic_model.data_path = dataset._data_path
            experiment.add_model(topic_model)
        if is_new_exp_cube:
            experiment.add_cube(cube_description)
        return topic_models
    def __call__(self, topic_model_input, dataset):
        """
        Apply cube to topic_model. Get new models and fit them on batch_vectorizer.
        Return list of all trained models.
        Parameters
        ----------
        topic_model_input: TopicModel or list of TopicModel
        dataset: Dataset
        Returns
        -------
        list of TopicModel
        """
        if isinstance(topic_model_input, (list, set)):
            results = [
                self._run_cube(topic_model, dataset)
                for topic_model in topic_model_input
            ]
            return results
        return self._run_cube(topic_model_input, dataset)
Functions
def check_experiment_existence(topic_model)- 
Checks if topic_model has experiment.
Parameters
topic_model:TopicModel- topic model
 
Returns
bool- True if experiment exists, in other case False.
 
Expand source code
def check_experiment_existence(topic_model): """ Checks if topic_model has experiment. Parameters ---------- topic_model : TopicModel topic model Returns ------- bool True if experiment exists, in other case False. """ is_experiment = topic_model.experiment is not None return is_experiment def get_from_queue_till_fail(queue, error_message='')- 
Expand source code
def get_from_queue_till_fail(queue, error_message='',): return queue.get() def put_to_queue(queue, puttable)- 
Expand source code
def put_to_queue(queue, puttable): queue.put(puttable) def retrieve_score_for_strategy(score_name=None)- 
Expand source code
def retrieve_score_for_strategy(score_name=None): if not score_name: score_name = 'PerplexityScore@all' def last_score(model): try: return model.scores[score_name][-1] except KeyError: raise KeyError(SCORE_ERROR_MESSAGE.format(score_name)) return last_score 
Classes
class BaseCube (num_iter, action=None, reg_search='grid', strategy=None, tracked_score_function=None, verbose=False, separate_thread=True)- 
Abstract class for all cubes.
Initialize stage. Checks params and update .parameters attribute.
Parameters
num_iter:int- number of iterations or method
 action:str- stage of creation
 reg_search:str- "grid" or "pair". "pair" for elementwise grid search in the case of several regularizers, "grid" for the fullgrid search in the case of several regularizers
 strategy:BaseStrategy- optimization approach
 tracked_score_function:strorcallable- optimizable function for strategy
 verbose:bool- visualization flag
 separate_thread:bool- will train models inside a separate thread if True
 
Expand source code
class BaseCube: """ Abstract class for all cubes. """ def __init__(self, num_iter, action=None, reg_search="grid", strategy=None, tracked_score_function=None, verbose=False, separate_thread=True): """ Initialize stage. Checks params and update .parameters attribute. Parameters ---------- num_iter : int number of iterations or method action : str stage of creation reg_search : str "grid" or "pair". "pair" for elementwise grid search in the case of several regularizers, "grid" for the fullgrid search in the case of several regularizers strategy : BaseStrategy optimization approach tracked_score_function : str or callable optimizable function for strategy verbose : bool visualization flag separate_thread : bool will train models inside a separate thread if True """ self.num_iter = num_iter self.parameters = [] self.action = action self.reg_search = reg_search if not strategy: strategy = BaseStrategy() self.strategy = strategy self.verbose = verbose self.separate_thread = separate_thread if isinstance(tracked_score_function, str): tracked_score_function = retrieve_score_for_strategy(tracked_score_function) self.tracked_score_function = tracked_score_function def apply(self, topic_model, one_cube_parameter, dictionary=None, model_id=None): """ "apply" method changes topic_model in way that is defined by one_cube_parameter. Parameters ---------- topic_model : TopicModel topic model one_cube_parameter : optional parameters of one experiment dictionary : dict dictionary so that the it can be used on the basis of the model (Default value = None) model_id : str id of created model if necessary (Default value = None) Returns ------- """ raise NotImplementedError('must be implemented in subclass') # TODO: из-за метода get_description на эту фунцию налагется больше требований чем тут написано def get_jsonable_from_parameters(self): """ Transform self.parameters to something that can be downloaded as json. Parameters ---------- Returns ------- optional something jsonable """ return self.parameters def _train_models(self, experiment, topic_model, dataset, search_space): """ This function trains models """ dataset_trainable = dataset._transform_data_for_training() dataset_dictionary = dataset.get_dictionary() returned_paths = [] experiment_save_path = experiment.save_path experiment_id = experiment.experiment_id save_folder = os.path.join(experiment_save_path, experiment_id) for search_point in search_space: candidate_name = get_timestamp_in_str_format() new_model_id = padd_model_name(candidate_name) new_model_save_path = os.path.join(save_folder, new_model_id) model_index = 0 while os.path.exists(new_model_save_path): model_index += 1 new_model_id = padd_model_name("{0}{1:_>5}".format(candidate_name, model_index)) new_model_save_path = os.path.join(save_folder, new_model_id) model_cube = { "action": self.action, "num_iter": self.num_iter, "params": repr(search_point) } try: # alter the model according to cube parameters new_model = self.apply(topic_model, search_point, dataset_dictionary, new_model_id) # train new model for a number of iterations (might be zero) new_model._fit( dataset_trainable=dataset_trainable, num_iterations=self.num_iter ) except ArtmException as e: error_message = repr(e) raise ValueError( f'Cannot alter and fit artm model with parameters {search_point}.\n' "ARTM failed with following: " + error_message ) # add cube description to the model history new_model.add_cube(model_cube) new_model.experiment = experiment new_model.save() assert os.path.exists(new_model.model_default_save_path) returned_paths.append(new_model.model_default_save_path) # some strategies depend on previous train results, therefore scores must be updated if self.tracked_score_function: current_score = self.tracked_score_function(new_model) self.strategy.update_scores(current_score) # else: # we return number of iterations as a placeholder # current_score = len(returned_paths) return returned_paths def _retrieve_results_from_process(self, queue, experiment): from ..models import DummyTopicModel models_num = get_from_queue_till_fail(queue, NUM_MODELS_ERROR) topic_models = [] for _ in range(models_num): path = get_from_queue_till_fail(queue, MODEL_RETRIEVE_ERROR.format(_, models_num)) topic_models.append(DummyTopicModel.load(path, experiment=experiment)) strategy_parameters = get_from_queue_till_fail(queue, STRATEGY_RETRIEVE_ERROR) caught_warnings = get_from_queue_till_fail(queue, WARNINGS_RETRIEVE_ERROR) self.strategy._set_strategy_parameters(strategy_parameters) for (warning_message, warning_class) in caught_warnings: # if issubclass(warning_class, UserWarning): warnings.warn(warning_message) return topic_models def _train_models_and_report_results(self, queue, experiment, topic_model, dataset, search_space, search_length): """ This function trains models in separate thread, saves them and returns all paths for save with respect to train order. To preserve train order model number is also returned. """ with warnings.catch_warnings(record=True) as caught_warnings: returned_paths = self._train_models(experiment, topic_model, dataset, search_space) put_to_queue(queue, len(returned_paths)) for path in returned_paths: put_to_queue(queue, path) # to work with strategy we recover consistency by sending important parameters strategy_parameters = self.strategy._get_strategy_parameters(saveable_only=True) put_to_queue(queue, strategy_parameters) caught_warnings = [(warning.message, warning.category) for warning in caught_warnings] put_to_queue(queue, caught_warnings) def _run_cube(self, topic_model, dataset): """ Apply cube to topic_model. Get new models and fit them on batch_vectorizer. Return list of all trained models. Parameters ---------- topic_model : TopicModel dataset : Dataset Returns ------- TopicModel """ from ..models import DummyTopicModel if isinstance(topic_model, DummyTopicModel): topic_model = topic_model.restore() # create log # TODO: будет странно работать, если бесконечный список parameter_description = self.get_jsonable_from_parameters() cube_description = { 'action': self.action, 'params': parameter_description } # at one level only one cube can be implemented if not check_experiment_existence(topic_model): raise ValueError("TopicModel has no experiment. You should create Experiment.") experiment = topic_model.experiment topic_model_depth_in_tree = topic_model.depth if topic_model_depth_in_tree < len(experiment.cubes): existed_cube = experiment.cubes[topic_model_depth_in_tree] if existed_cube['params'] != cube_description['params'] or \ existed_cube['action'] != cube_description['action']: error_message = ( "\nYou can not change strategy to another on this level in " "this experiment.\n" "If you want you can create another experiment with this " "model with parameter new_experiment=True." f"the existing cube is \n {existed_cube['params']} \n, " f"but the proposed cube is \n {cube_description['params']} \n" ) raise ValueError(error_message) is_new_exp_cube = False else: is_new_exp_cube = True # perform all experiments self.strategy.prepare_grid(self.parameters, self.reg_search) search_space = self.strategy.grid_visit_generator(self.parameters, self.reg_search) search_length = getattr(self.strategy, 'grid_len', None) if self.verbose: search_space = tqdm(search_space, total=search_length) if self.separate_thread: queue = Queue() process = Process( target=self._train_models_and_report_results, args=(queue, experiment, topic_model, dataset, search_space, search_length), daemon=True ) process.start() topic_models = self._retrieve_results_from_process(queue, experiment) else: returned_paths = self._train_models(experiment, topic_model, dataset, search_space) topic_models = [ DummyTopicModel.load(path, experiment=experiment) for path in returned_paths ] for topic_model in topic_models: topic_model.data_path = dataset._data_path experiment.add_model(topic_model) if is_new_exp_cube: experiment.add_cube(cube_description) return topic_models def __call__(self, topic_model_input, dataset): """ Apply cube to topic_model. Get new models and fit them on batch_vectorizer. Return list of all trained models. Parameters ---------- topic_model_input: TopicModel or list of TopicModel dataset: Dataset Returns ------- list of TopicModel """ if isinstance(topic_model_input, (list, set)): results = [ self._run_cube(topic_model, dataset) for topic_model in topic_model_input ] return results return self._run_cube(topic_model_input, dataset)Subclasses
Methods
def apply(self, topic_model, one_cube_parameter, dictionary=None, model_id=None)- 
"apply" method changes topic_model in way that is defined by one_cube_parameter.
Parameters
topic_model:TopicModel- topic model
 one_cube_parameter:optional- parameters of one experiment
 dictionary:dict- dictionary so that the it can be used on the basis of the model (Default value = None)
 model_id:str- id of created model if necessary (Default value = None)
 
Returns
Expand source code
def apply(self, topic_model, one_cube_parameter, dictionary=None, model_id=None): """ "apply" method changes topic_model in way that is defined by one_cube_parameter. Parameters ---------- topic_model : TopicModel topic model one_cube_parameter : optional parameters of one experiment dictionary : dict dictionary so that the it can be used on the basis of the model (Default value = None) model_id : str id of created model if necessary (Default value = None) Returns ------- """ raise NotImplementedError('must be implemented in subclass') def get_jsonable_from_parameters(self)- 
Transform self.parameters to something that can be downloaded as json.
Parameters
Returns
optional- something jsonable
 
Expand source code
def get_jsonable_from_parameters(self): """ Transform self.parameters to something that can be downloaded as json. Parameters ---------- Returns ------- optional something jsonable """ return self.parameters