Module topicnet.cooking_machine.cubes.base_cube

Expand source code
import os
from tqdm import tqdm
import warnings
from multiprocessing import Queue, Process
from artm.wrapper.exceptions import ArtmException

from .strategy import BaseStrategy
from ..models.base_model import padd_model_name
from ..routine import get_timestamp_in_str_format

NUM_MODELS_ERROR = "Failed to retrive number of trained models"
MODEL_RETRIEVE_ERROR = "Retrieved only {0} models out of {1}"
STRATEGY_RETRIEVE_ERROR = 'Failed to retrieve strategy parameters'
WARNINGS_RETRIEVE_ERROR = 'Failed to return warnings'
SCORE_ERROR_MESSAGE = "Can't find a score ''{0}''. Please add a score with that name to the model."


def check_experiment_existence(topic_model):
    """
    Checks if topic_model has experiment.

    Parameters
    ----------
    topic_model : TopicModel
        topic model

    Returns
    -------
    bool
        True if experiment exists, in other case False.

    """
    is_experiment = topic_model.experiment is not None

    return is_experiment


def retrieve_score_for_strategy(score_name=None):
    if not score_name:
        score_name = 'PerplexityScore@all'

    def last_score(model):
        try:
            return model.scores[score_name][-1]
        except KeyError:
            raise KeyError(SCORE_ERROR_MESSAGE.format(score_name))
    return last_score


# exists for multiprocessing debug
def put_to_queue(queue, puttable):
    queue.put(puttable)


# exists for multiprocessing debug
def get_from_queue_till_fail(queue,  error_message='',):
    return queue.get()


class BaseCube:
    """
    Abstract class for all cubes.

    """
    def __init__(self, num_iter, action=None, reg_search="grid",
                 strategy=None, tracked_score_function=None,
                 verbose=False, separate_thread=True):
        """
        Initialize stage.
        Checks params and update .parameters attribute.

        Parameters
        ----------
        num_iter : int
            number of iterations or method
        action : str
            stage of creation
        reg_search : str
            "grid" or "pair". "pair" for elementwise grid search in the case
            of several regularizers, "grid" for the fullgrid search in the
            case of several regularizers
        strategy : BaseStrategy
            optimization approach
        tracked_score_function : str or callable
            optimizable function for strategy
        verbose : bool
            visualization flag
        separate_thread : bool
            will train models inside a separate thread if True

        """
        self.num_iter = num_iter
        self.parameters = []
        self.action = action
        self.reg_search = reg_search
        if not strategy:
            strategy = BaseStrategy()
        self.strategy = strategy
        self.verbose = verbose
        self.separate_thread = separate_thread

        if isinstance(tracked_score_function, str):
            tracked_score_function = retrieve_score_for_strategy(tracked_score_function)
        self.tracked_score_function = tracked_score_function

    def apply(self, topic_model, one_cube_parameter, dictionary=None, model_id=None):
        """
        "apply" method changes topic_model in way that is defined by one_cube_parameter.

        Parameters
        ----------
        topic_model : TopicModel
            topic model
        one_cube_parameter : optional
            parameters of one experiment
        dictionary : dict
            dictionary so that the it can be used
            on the basis of the model (Default value = None)
        model_id : str
            id of created model if necessary (Default value = None)

        Returns
        -------

        """
        raise NotImplementedError('must be implemented in subclass')

    # TODO: из-за метода get_description на эту фунцию налагется больше требований чем тут написано
    def get_jsonable_from_parameters(self):
        """
        Transform self.parameters to something that can be downloaded as json.

        Parameters
        ----------

        Returns
        -------
        optional
            something jsonable

        """
        return self.parameters

    def _train_models(self, experiment, topic_model, dataset, search_space):
        """
        This function trains models
        """
        dataset_trainable = dataset._transform_data_for_training()
        dataset_dictionary = dataset.get_dictionary()
        returned_paths = []
        experiment_save_path = experiment.save_path
        experiment_id = experiment.experiment_id
        save_folder = os.path.join(experiment_save_path, experiment_id)
        for search_point in search_space:
            candidate_name = get_timestamp_in_str_format()
            new_model_id = padd_model_name(candidate_name)
            new_model_save_path = os.path.join(save_folder, new_model_id)
            model_index = 0
            while os.path.exists(new_model_save_path):
                model_index += 1
                new_model_id = padd_model_name("{0}{1:_>5}".format(candidate_name, model_index))
                new_model_save_path = os.path.join(save_folder, new_model_id)

            model_cube = {
                "action": self.action,
                "num_iter": self.num_iter,
                "params": repr(search_point)
            }

            try:
                # alter the model according to cube parameters
                new_model = self.apply(topic_model, search_point, dataset_dictionary, new_model_id)
                # train new model for a number of iterations (might be zero)
                new_model._fit(
                    dataset_trainable=dataset_trainable,
                    num_iterations=self.num_iter
                )
            except ArtmException as e:
                error_message = repr(e)
                raise ValueError(
                    f'Cannot alter and fit artm model with parameters {search_point}.\n'
                    "ARTM failed with following: " + error_message

                )
            # add cube description to the model history
            new_model.add_cube(model_cube)
            new_model.experiment = experiment
            new_model.save()
            assert os.path.exists(new_model.model_default_save_path)

            returned_paths.append(new_model.model_default_save_path)

            # some strategies depend on previous train results, therefore scores must be updated
            if self.tracked_score_function:
                current_score = self.tracked_score_function(new_model)
                self.strategy.update_scores(current_score)
            # else:
                # we return number of iterations as a placeholder
                # current_score = len(returned_paths)

        return returned_paths

    def _retrieve_results_from_process(self, queue, experiment):
        from ..models import DummyTopicModel
        models_num = get_from_queue_till_fail(queue, NUM_MODELS_ERROR)
        topic_models = []
        for _ in range(models_num):
            path = get_from_queue_till_fail(queue,
                                            MODEL_RETRIEVE_ERROR.format(_, models_num))
            topic_models.append(DummyTopicModel.load(path, experiment=experiment))

        strategy_parameters = get_from_queue_till_fail(queue, STRATEGY_RETRIEVE_ERROR)
        caught_warnings = get_from_queue_till_fail(queue, WARNINGS_RETRIEVE_ERROR)
        self.strategy._set_strategy_parameters(strategy_parameters)

        for (warning_message, warning_class) in caught_warnings:
            # if issubclass(warning_class, UserWarning):
            warnings.warn(warning_message)

        return topic_models

    def _train_models_and_report_results(self, queue, experiment, topic_model, dataset,
                                         search_space, search_length):
        """
        This function trains models in separate thread, saves them
        and returns all paths for save with respect to train order.
        To preserve train order model number is also returned.

        """
        with warnings.catch_warnings(record=True) as caught_warnings:
            returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
            put_to_queue(queue, len(returned_paths))
            for path in returned_paths:
                put_to_queue(queue, path)

            # to work with strategy we recover consistency by sending important parameters
            strategy_parameters = self.strategy._get_strategy_parameters(saveable_only=True)
            put_to_queue(queue, strategy_parameters)

            caught_warnings = [(warning.message, warning.category)
                               for warning in caught_warnings]
            put_to_queue(queue, caught_warnings)

    def _run_cube(self, topic_model, dataset):
        """
        Apply cube to topic_model. Get new models and fit them on batch_vectorizer.
        Return list of all trained models.

        Parameters
        ----------
        topic_model : TopicModel
        dataset : Dataset

        Returns
        -------
        TopicModel

        """

        from ..models import DummyTopicModel
        if isinstance(topic_model, DummyTopicModel):
            topic_model = topic_model.restore()

        # create log
        # TODO: будет странно работать, если бесконечный список
        parameter_description = self.get_jsonable_from_parameters()
        cube_description = {
            'action': self.action,
            'params': parameter_description
        }

        # at one level only one cube can be implemented
        if not check_experiment_existence(topic_model):
            raise ValueError("TopicModel has no experiment. You should create Experiment.")
        experiment = topic_model.experiment
        topic_model_depth_in_tree = topic_model.depth
        if topic_model_depth_in_tree < len(experiment.cubes):
            existed_cube = experiment.cubes[topic_model_depth_in_tree]
            if existed_cube['params'] != cube_description['params'] or \
                    existed_cube['action'] != cube_description['action']:
                error_message = (
                    "\nYou can not change strategy to another on this level in "
                    "this experiment.\n"
                    "If you want you can create another experiment with this "
                    "model with parameter new_experiment=True."
                    f"the existing cube is \n {existed_cube['params']} \n, "
                    f"but the proposed cube is \n {cube_description['params']} \n"
                )
                raise ValueError(error_message)
            is_new_exp_cube = False
        else:
            is_new_exp_cube = True

        # perform all experiments
        self.strategy.prepare_grid(self.parameters, self.reg_search)
        search_space = self.strategy.grid_visit_generator(self.parameters, self.reg_search)
        search_length = getattr(self.strategy, 'grid_len', None)

        if self.verbose:
            search_space = tqdm(search_space, total=search_length)

        if self.separate_thread:
            queue = Queue()
            process = Process(
                target=self._train_models_and_report_results,
                args=(queue, experiment, topic_model, dataset,
                      search_space, search_length),
                daemon=True
            )
            process.start()
            topic_models = self._retrieve_results_from_process(queue, experiment)
        else:
            returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
            topic_models = [
                DummyTopicModel.load(path, experiment=experiment)
                for path in returned_paths
            ]

        for topic_model in topic_models:
            topic_model.data_path = dataset._data_path
            experiment.add_model(topic_model)

        if is_new_exp_cube:
            experiment.add_cube(cube_description)

        return topic_models

    def __call__(self, topic_model_input, dataset):
        """
        Apply cube to topic_model. Get new models and fit them on batch_vectorizer.
        Return list of all trained models.

        Parameters
        ----------
        topic_model_input: TopicModel or list of TopicModel
        dataset: Dataset

        Returns
        -------
        list of TopicModel

        """
        if isinstance(topic_model_input, (list, set)):
            results = [
                self._run_cube(topic_model, dataset)
                for topic_model in topic_model_input
            ]
            return results
        return self._run_cube(topic_model_input, dataset)

Functions

def check_experiment_existence(topic_model)

Checks if topic_model has experiment.

Parameters

topic_model : TopicModel
topic model

Returns

bool
True if experiment exists, in other case False.
Expand source code
def check_experiment_existence(topic_model):
    """
    Checks if topic_model has experiment.

    Parameters
    ----------
    topic_model : TopicModel
        topic model

    Returns
    -------
    bool
        True if experiment exists, in other case False.

    """
    is_experiment = topic_model.experiment is not None

    return is_experiment
def get_from_queue_till_fail(queue, error_message='')
Expand source code
def get_from_queue_till_fail(queue,  error_message='',):
    return queue.get()
def put_to_queue(queue, puttable)
Expand source code
def put_to_queue(queue, puttable):
    queue.put(puttable)
def retrieve_score_for_strategy(score_name=None)
Expand source code
def retrieve_score_for_strategy(score_name=None):
    if not score_name:
        score_name = 'PerplexityScore@all'

    def last_score(model):
        try:
            return model.scores[score_name][-1]
        except KeyError:
            raise KeyError(SCORE_ERROR_MESSAGE.format(score_name))
    return last_score

Classes

class BaseCube (num_iter, action=None, reg_search='grid', strategy=None, tracked_score_function=None, verbose=False, separate_thread=True)

Abstract class for all cubes.

Initialize stage. Checks params and update .parameters attribute.

Parameters

num_iter : int
number of iterations or method
action : str
stage of creation
reg_search : str
"grid" or "pair". "pair" for elementwise grid search in the case of several regularizers, "grid" for the fullgrid search in the case of several regularizers
strategy : BaseStrategy
optimization approach
tracked_score_function : str or callable
optimizable function for strategy
verbose : bool
visualization flag
separate_thread : bool
will train models inside a separate thread if True
Expand source code
class BaseCube:
    """
    Abstract class for all cubes.

    """
    def __init__(self, num_iter, action=None, reg_search="grid",
                 strategy=None, tracked_score_function=None,
                 verbose=False, separate_thread=True):
        """
        Initialize stage.
        Checks params and update .parameters attribute.

        Parameters
        ----------
        num_iter : int
            number of iterations or method
        action : str
            stage of creation
        reg_search : str
            "grid" or "pair". "pair" for elementwise grid search in the case
            of several regularizers, "grid" for the fullgrid search in the
            case of several regularizers
        strategy : BaseStrategy
            optimization approach
        tracked_score_function : str or callable
            optimizable function for strategy
        verbose : bool
            visualization flag
        separate_thread : bool
            will train models inside a separate thread if True

        """
        self.num_iter = num_iter
        self.parameters = []
        self.action = action
        self.reg_search = reg_search
        if not strategy:
            strategy = BaseStrategy()
        self.strategy = strategy
        self.verbose = verbose
        self.separate_thread = separate_thread

        if isinstance(tracked_score_function, str):
            tracked_score_function = retrieve_score_for_strategy(tracked_score_function)
        self.tracked_score_function = tracked_score_function

    def apply(self, topic_model, one_cube_parameter, dictionary=None, model_id=None):
        """
        "apply" method changes topic_model in way that is defined by one_cube_parameter.

        Parameters
        ----------
        topic_model : TopicModel
            topic model
        one_cube_parameter : optional
            parameters of one experiment
        dictionary : dict
            dictionary so that the it can be used
            on the basis of the model (Default value = None)
        model_id : str
            id of created model if necessary (Default value = None)

        Returns
        -------

        """
        raise NotImplementedError('must be implemented in subclass')

    # TODO: из-за метода get_description на эту фунцию налагется больше требований чем тут написано
    def get_jsonable_from_parameters(self):
        """
        Transform self.parameters to something that can be downloaded as json.

        Parameters
        ----------

        Returns
        -------
        optional
            something jsonable

        """
        return self.parameters

    def _train_models(self, experiment, topic_model, dataset, search_space):
        """
        This function trains models
        """
        dataset_trainable = dataset._transform_data_for_training()
        dataset_dictionary = dataset.get_dictionary()
        returned_paths = []
        experiment_save_path = experiment.save_path
        experiment_id = experiment.experiment_id
        save_folder = os.path.join(experiment_save_path, experiment_id)
        for search_point in search_space:
            candidate_name = get_timestamp_in_str_format()
            new_model_id = padd_model_name(candidate_name)
            new_model_save_path = os.path.join(save_folder, new_model_id)
            model_index = 0
            while os.path.exists(new_model_save_path):
                model_index += 1
                new_model_id = padd_model_name("{0}{1:_>5}".format(candidate_name, model_index))
                new_model_save_path = os.path.join(save_folder, new_model_id)

            model_cube = {
                "action": self.action,
                "num_iter": self.num_iter,
                "params": repr(search_point)
            }

            try:
                # alter the model according to cube parameters
                new_model = self.apply(topic_model, search_point, dataset_dictionary, new_model_id)
                # train new model for a number of iterations (might be zero)
                new_model._fit(
                    dataset_trainable=dataset_trainable,
                    num_iterations=self.num_iter
                )
            except ArtmException as e:
                error_message = repr(e)
                raise ValueError(
                    f'Cannot alter and fit artm model with parameters {search_point}.\n'
                    "ARTM failed with following: " + error_message

                )
            # add cube description to the model history
            new_model.add_cube(model_cube)
            new_model.experiment = experiment
            new_model.save()
            assert os.path.exists(new_model.model_default_save_path)

            returned_paths.append(new_model.model_default_save_path)

            # some strategies depend on previous train results, therefore scores must be updated
            if self.tracked_score_function:
                current_score = self.tracked_score_function(new_model)
                self.strategy.update_scores(current_score)
            # else:
                # we return number of iterations as a placeholder
                # current_score = len(returned_paths)

        return returned_paths

    def _retrieve_results_from_process(self, queue, experiment):
        from ..models import DummyTopicModel
        models_num = get_from_queue_till_fail(queue, NUM_MODELS_ERROR)
        topic_models = []
        for _ in range(models_num):
            path = get_from_queue_till_fail(queue,
                                            MODEL_RETRIEVE_ERROR.format(_, models_num))
            topic_models.append(DummyTopicModel.load(path, experiment=experiment))

        strategy_parameters = get_from_queue_till_fail(queue, STRATEGY_RETRIEVE_ERROR)
        caught_warnings = get_from_queue_till_fail(queue, WARNINGS_RETRIEVE_ERROR)
        self.strategy._set_strategy_parameters(strategy_parameters)

        for (warning_message, warning_class) in caught_warnings:
            # if issubclass(warning_class, UserWarning):
            warnings.warn(warning_message)

        return topic_models

    def _train_models_and_report_results(self, queue, experiment, topic_model, dataset,
                                         search_space, search_length):
        """
        This function trains models in separate thread, saves them
        and returns all paths for save with respect to train order.
        To preserve train order model number is also returned.

        """
        with warnings.catch_warnings(record=True) as caught_warnings:
            returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
            put_to_queue(queue, len(returned_paths))
            for path in returned_paths:
                put_to_queue(queue, path)

            # to work with strategy we recover consistency by sending important parameters
            strategy_parameters = self.strategy._get_strategy_parameters(saveable_only=True)
            put_to_queue(queue, strategy_parameters)

            caught_warnings = [(warning.message, warning.category)
                               for warning in caught_warnings]
            put_to_queue(queue, caught_warnings)

    def _run_cube(self, topic_model, dataset):
        """
        Apply cube to topic_model. Get new models and fit them on batch_vectorizer.
        Return list of all trained models.

        Parameters
        ----------
        topic_model : TopicModel
        dataset : Dataset

        Returns
        -------
        TopicModel

        """

        from ..models import DummyTopicModel
        if isinstance(topic_model, DummyTopicModel):
            topic_model = topic_model.restore()

        # create log
        # TODO: будет странно работать, если бесконечный список
        parameter_description = self.get_jsonable_from_parameters()
        cube_description = {
            'action': self.action,
            'params': parameter_description
        }

        # at one level only one cube can be implemented
        if not check_experiment_existence(topic_model):
            raise ValueError("TopicModel has no experiment. You should create Experiment.")
        experiment = topic_model.experiment
        topic_model_depth_in_tree = topic_model.depth
        if topic_model_depth_in_tree < len(experiment.cubes):
            existed_cube = experiment.cubes[topic_model_depth_in_tree]
            if existed_cube['params'] != cube_description['params'] or \
                    existed_cube['action'] != cube_description['action']:
                error_message = (
                    "\nYou can not change strategy to another on this level in "
                    "this experiment.\n"
                    "If you want you can create another experiment with this "
                    "model with parameter new_experiment=True."
                    f"the existing cube is \n {existed_cube['params']} \n, "
                    f"but the proposed cube is \n {cube_description['params']} \n"
                )
                raise ValueError(error_message)
            is_new_exp_cube = False
        else:
            is_new_exp_cube = True

        # perform all experiments
        self.strategy.prepare_grid(self.parameters, self.reg_search)
        search_space = self.strategy.grid_visit_generator(self.parameters, self.reg_search)
        search_length = getattr(self.strategy, 'grid_len', None)

        if self.verbose:
            search_space = tqdm(search_space, total=search_length)

        if self.separate_thread:
            queue = Queue()
            process = Process(
                target=self._train_models_and_report_results,
                args=(queue, experiment, topic_model, dataset,
                      search_space, search_length),
                daemon=True
            )
            process.start()
            topic_models = self._retrieve_results_from_process(queue, experiment)
        else:
            returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
            topic_models = [
                DummyTopicModel.load(path, experiment=experiment)
                for path in returned_paths
            ]

        for topic_model in topic_models:
            topic_model.data_path = dataset._data_path
            experiment.add_model(topic_model)

        if is_new_exp_cube:
            experiment.add_cube(cube_description)

        return topic_models

    def __call__(self, topic_model_input, dataset):
        """
        Apply cube to topic_model. Get new models and fit them on batch_vectorizer.
        Return list of all trained models.

        Parameters
        ----------
        topic_model_input: TopicModel or list of TopicModel
        dataset: Dataset

        Returns
        -------
        list of TopicModel

        """
        if isinstance(topic_model_input, (list, set)):
            results = [
                self._run_cube(topic_model, dataset)
                for topic_model in topic_model_input
            ]
            return results
        return self._run_cube(topic_model_input, dataset)

Subclasses

Methods

def apply(self, topic_model, one_cube_parameter, dictionary=None, model_id=None)

"apply" method changes topic_model in way that is defined by one_cube_parameter.

Parameters

topic_model : TopicModel
topic model
one_cube_parameter : optional
parameters of one experiment
dictionary : dict
dictionary so that the it can be used on the basis of the model (Default value = None)
model_id : str
id of created model if necessary (Default value = None)

Returns

Expand source code
def apply(self, topic_model, one_cube_parameter, dictionary=None, model_id=None):
    """
    "apply" method changes topic_model in way that is defined by one_cube_parameter.

    Parameters
    ----------
    topic_model : TopicModel
        topic model
    one_cube_parameter : optional
        parameters of one experiment
    dictionary : dict
        dictionary so that the it can be used
        on the basis of the model (Default value = None)
    model_id : str
        id of created model if necessary (Default value = None)

    Returns
    -------

    """
    raise NotImplementedError('must be implemented in subclass')
def get_jsonable_from_parameters(self)

Transform self.parameters to something that can be downloaded as json.

Parameters

Returns

optional
something jsonable
Expand source code
def get_jsonable_from_parameters(self):
    """
    Transform self.parameters to something that can be downloaded as json.

    Parameters
    ----------

    Returns
    -------
    optional
        something jsonable

    """
    return self.parameters