Module topicnet.cooking_machine.rel_toolbox_lite

Expand source code
import os
import uuid


def count_vocab_size(dictionary, modalities):
    # TODO: check tokens filtered by dict.filter()
    fname = str(uuid.uuid4()) + '.txt'  # Plain 'tmp.txt' may fail if several processes work with the same file
    try:
        dictionary.save_text(fname)
        modality_count = {name: 0 for name in modalities}
        modality_vocab_size = {name: 0 for name in modalities}
        with open(fname, 'r', encoding='utf-8') as f:
            header = next(f)
            num_docs = int(header.partition("num_items: ")[2])
            next(f)
            for line in f:
                token, class_id, _, token_tf, token_df = line.split(", ")
                if class_id in modalities:
                    modality_count[class_id] += float(token_tf)
                    modality_vocab_size[class_id] += 1
        return (modality_count, modality_vocab_size, num_docs)
    finally:
        os.remove(fname)


def calc_docs_avg_len(ds, weights):
    (modality_count, modality_vocab_size, n_docs) = ds
    docs_total_len = 0
    for modality, tokens_total_sum in modality_count.items():
        w = weights[modality]
        docs_total_len += w * tokens_total_sum
    avg_doc_len = docs_total_len / n_docs
    return avg_doc_len


def theta_weight_abs2rel(ds, modality_weights, n_topics, tau):
    avg_doc_len = calc_docs_avg_len(ds, modality_weights)
    gimel_multiplier = avg_doc_len / n_topics + tau
    gimel = tau / gimel_multiplier
    return gimel


def theta_weight_rel2abs(ds, modality_weights, n_topics, gimel):
    avg_doc_len = calc_docs_avg_len(ds, modality_weights)
    tau = (avg_doc_len / n_topics) * gimel / (1 - gimel)
    return tau


def phi_weight_abs2rel(ds, modality_weights, n_topics, tau, modalities_list=None):
    (modality_count, modality_vocab_size, n_docs) = ds
    if modalities_list is None:
        modalities_list = modality_count.keys()
    docs_total_len = 0
    vocab_size = 0
    for modality in modalities_list:
        tokens_total_sum = modality_count[modality]
        vocab_size += modality_vocab_size[modality]
        w = modality_weights[modality]
        docs_total_len += w * tokens_total_sum
    # TODO: check if formula is OK
    odds_gimel = (tau * n_topics * vocab_size) / docs_total_len
    gimel = odds_gimel / (1 + odds_gimel)
    return gimel


def phi_weight_rel2abs(ds, modality_weights, n_topics, gimel, modalities_list=None):
    (modality_count, modality_vocab_size, n_docs) = ds
    if modalities_list is None:
        modalities_list = modality_count.keys()
    docs_total_len = 0
    vocab_size = 0
    for modality in modalities_list:
        tokens_total_sum = modality_count[modality]
        vocab_size += modality_vocab_size[modality]
        w = modality_weights[modality]
        docs_total_len += w * tokens_total_sum
    # TODO: check if formula is OK
    tau = (docs_total_len / (n_topics * vocab_size)) * gimel / (1 - gimel)
    return tau


def compute_regularizer_tau(tokens_data, reg, modality_weights, n_topics):

    (modality_count, modality_vocab_size, num_docs) = tokens_data

    gimel = reg.tau

    if "SmoothSparseThetaRegularizer" in str(type(reg)):
        tau = theta_weight_rel2abs(tokens_data, modality_weights,
                                   n_topics, gimel)
        return tau
    elif "SmoothSparsePhiRegularizer" in str(type(reg)):
        if len(reg.class_ids):
            modalities_list = reg.class_ids
        else:
            modalities_list = modality_weights.keys()

        tau = phi_weight_rel2abs(tokens_data, modality_weights,
                                 n_topics, gimel, modalities_list)
        return tau
    elif "DecorrelatorPhiRegularizer" in str(type(reg)):
        raise ValueError("Decorrelator {} warrants further study".format(reg.name))
    else:
        raise KeyError("Invalid: {}".format(reg.name))


def compute_regularizer_gimel(tokens_data, reg, modality_weights, n_topics):

    (modality_count, modality_vocab_size, num_docs) = tokens_data

    if "SmoothSparseThetaRegularizer" in str(type(reg)):
        gimel = theta_weight_abs2rel(tokens_data, modality_weights,
                                     n_topics, reg.tau)
        return gimel
    elif "SmoothSparsePhiRegularizer" in str(type(reg)):
        if len(reg.class_ids):
            modalities_list = reg.class_ids
        else:
            modalities_list = modality_weights.keys()

        gimel = phi_weight_abs2rel(tokens_data, modality_weights,
                                   n_topics, reg.tau, modalities_list)
        return gimel
    elif "DecorrelatorPhiRegularizer" in str(type(reg)):
        raise ValueError("Decorrelator {} warrants further study".format(reg.name))
    else:
        raise KeyError("Invalid: {}".format(reg.name))


def transform_regularizer(tokens_data, reg, modality_weights, n_topics=None):

    if n_topics is None and len(reg.topic_names) == 0:
        raise ValueError('Number of topics to regularize should be specified')
    if n_topics is None:
        n_topics = len(reg.topic_names)

    (modality_count, modality_vocab_size, num_docs) = tokens_data

    new_tau = compute_regularizer_tau(tokens_data, reg, modality_weights, n_topics)
    reg_class = reg.__class__
    reg_copy = reg_class(
            tau=new_tau,
            name=reg.name,
            topic_names=reg.topic_names,
            # class_ids=reg.class_ids
    )
    return reg_copy


def modality_weight_rel2abs(tokens_data, weights, default_modality):
    (modality_count, modality_vocab_size, num_docs) = tokens_data
    taus = {}
    default_weight = modality_count[default_modality]
    for modality in weights:
        if modality_count[modality]:
            gimel = weights[modality]
            tau = gimel * default_weight / modality_count[modality]
            taus[modality] = tau
        else:
            taus[modality] = 0
    return taus


def handle_regularizer(use_relative_coefficients, model, regularizer, data_stats):
    """
    Handles the case of various regularizers that
    contain 'Regularizer' in their name, namely all artm regularizers

    Parameters
    ----------
    use_relative_coefficients : bool
        indicates whether regularizer should be altered
    model : TopicModel or artm.ARTM
        to be changed in place
    regularizer : an instance of Regularizer from artm library
    data_stats : dict
        collection-specific data

    Returns
    -------
    None

    """

    fallback_options = (AttributeError, TypeError, AssertionError)
    try:
        n_topics = len(regularizer.topic_names)
        assert n_topics > 0
    except fallback_options:
        n_topics = len(model.topic_names)

    regularizer_type = str(type(regularizer))
    if use_relative_coefficients and 'SmoothSparse' in regularizer_type:
        regularizer = transform_regularizer(
            data_stats,
            regularizer,
            model.class_ids,
            n_topics,
        )

    model.regularizers.add(regularizer, overwrite=True)
    if 'Decorrelator' in regularizer_type:
        if use_relative_coefficients:
            model.regularizers[regularizer.name].gamma = 0
        else:
            model.regularizers[regularizer.name].gamma = None

Functions

def calc_docs_avg_len(ds, weights)
Expand source code
def calc_docs_avg_len(ds, weights):
    (modality_count, modality_vocab_size, n_docs) = ds
    docs_total_len = 0
    for modality, tokens_total_sum in modality_count.items():
        w = weights[modality]
        docs_total_len += w * tokens_total_sum
    avg_doc_len = docs_total_len / n_docs
    return avg_doc_len
def compute_regularizer_gimel(tokens_data, reg, modality_weights, n_topics)
Expand source code
def compute_regularizer_gimel(tokens_data, reg, modality_weights, n_topics):

    (modality_count, modality_vocab_size, num_docs) = tokens_data

    if "SmoothSparseThetaRegularizer" in str(type(reg)):
        gimel = theta_weight_abs2rel(tokens_data, modality_weights,
                                     n_topics, reg.tau)
        return gimel
    elif "SmoothSparsePhiRegularizer" in str(type(reg)):
        if len(reg.class_ids):
            modalities_list = reg.class_ids
        else:
            modalities_list = modality_weights.keys()

        gimel = phi_weight_abs2rel(tokens_data, modality_weights,
                                   n_topics, reg.tau, modalities_list)
        return gimel
    elif "DecorrelatorPhiRegularizer" in str(type(reg)):
        raise ValueError("Decorrelator {} warrants further study".format(reg.name))
    else:
        raise KeyError("Invalid: {}".format(reg.name))
def compute_regularizer_tau(tokens_data, reg, modality_weights, n_topics)
Expand source code
def compute_regularizer_tau(tokens_data, reg, modality_weights, n_topics):

    (modality_count, modality_vocab_size, num_docs) = tokens_data

    gimel = reg.tau

    if "SmoothSparseThetaRegularizer" in str(type(reg)):
        tau = theta_weight_rel2abs(tokens_data, modality_weights,
                                   n_topics, gimel)
        return tau
    elif "SmoothSparsePhiRegularizer" in str(type(reg)):
        if len(reg.class_ids):
            modalities_list = reg.class_ids
        else:
            modalities_list = modality_weights.keys()

        tau = phi_weight_rel2abs(tokens_data, modality_weights,
                                 n_topics, gimel, modalities_list)
        return tau
    elif "DecorrelatorPhiRegularizer" in str(type(reg)):
        raise ValueError("Decorrelator {} warrants further study".format(reg.name))
    else:
        raise KeyError("Invalid: {}".format(reg.name))
def count_vocab_size(dictionary, modalities)
Expand source code
def count_vocab_size(dictionary, modalities):
    # TODO: check tokens filtered by dict.filter()
    fname = str(uuid.uuid4()) + '.txt'  # Plain 'tmp.txt' may fail if several processes work with the same file
    try:
        dictionary.save_text(fname)
        modality_count = {name: 0 for name in modalities}
        modality_vocab_size = {name: 0 for name in modalities}
        with open(fname, 'r', encoding='utf-8') as f:
            header = next(f)
            num_docs = int(header.partition("num_items: ")[2])
            next(f)
            for line in f:
                token, class_id, _, token_tf, token_df = line.split(", ")
                if class_id in modalities:
                    modality_count[class_id] += float(token_tf)
                    modality_vocab_size[class_id] += 1
        return (modality_count, modality_vocab_size, num_docs)
    finally:
        os.remove(fname)
def handle_regularizer(use_relative_coefficients, model, regularizer, data_stats)

Handles the case of various regularizers that contain 'Regularizer' in their name, namely all artm regularizers

Parameters

use_relative_coefficients : bool
indicates whether regularizer should be altered
model : TopicModel or artm.ARTM
to be changed in place
regularizer : an instance of Regularizer from artm library
 
data_stats : dict
collection-specific data

Returns

None
 
Expand source code
def handle_regularizer(use_relative_coefficients, model, regularizer, data_stats):
    """
    Handles the case of various regularizers that
    contain 'Regularizer' in their name, namely all artm regularizers

    Parameters
    ----------
    use_relative_coefficients : bool
        indicates whether regularizer should be altered
    model : TopicModel or artm.ARTM
        to be changed in place
    regularizer : an instance of Regularizer from artm library
    data_stats : dict
        collection-specific data

    Returns
    -------
    None

    """

    fallback_options = (AttributeError, TypeError, AssertionError)
    try:
        n_topics = len(regularizer.topic_names)
        assert n_topics > 0
    except fallback_options:
        n_topics = len(model.topic_names)

    regularizer_type = str(type(regularizer))
    if use_relative_coefficients and 'SmoothSparse' in regularizer_type:
        regularizer = transform_regularizer(
            data_stats,
            regularizer,
            model.class_ids,
            n_topics,
        )

    model.regularizers.add(regularizer, overwrite=True)
    if 'Decorrelator' in regularizer_type:
        if use_relative_coefficients:
            model.regularizers[regularizer.name].gamma = 0
        else:
            model.regularizers[regularizer.name].gamma = None
def modality_weight_rel2abs(tokens_data, weights, default_modality)
Expand source code
def modality_weight_rel2abs(tokens_data, weights, default_modality):
    (modality_count, modality_vocab_size, num_docs) = tokens_data
    taus = {}
    default_weight = modality_count[default_modality]
    for modality in weights:
        if modality_count[modality]:
            gimel = weights[modality]
            tau = gimel * default_weight / modality_count[modality]
            taus[modality] = tau
        else:
            taus[modality] = 0
    return taus
def phi_weight_abs2rel(ds, modality_weights, n_topics, tau, modalities_list=None)
Expand source code
def phi_weight_abs2rel(ds, modality_weights, n_topics, tau, modalities_list=None):
    (modality_count, modality_vocab_size, n_docs) = ds
    if modalities_list is None:
        modalities_list = modality_count.keys()
    docs_total_len = 0
    vocab_size = 0
    for modality in modalities_list:
        tokens_total_sum = modality_count[modality]
        vocab_size += modality_vocab_size[modality]
        w = modality_weights[modality]
        docs_total_len += w * tokens_total_sum
    # TODO: check if formula is OK
    odds_gimel = (tau * n_topics * vocab_size) / docs_total_len
    gimel = odds_gimel / (1 + odds_gimel)
    return gimel
def phi_weight_rel2abs(ds, modality_weights, n_topics, gimel, modalities_list=None)
Expand source code
def phi_weight_rel2abs(ds, modality_weights, n_topics, gimel, modalities_list=None):
    (modality_count, modality_vocab_size, n_docs) = ds
    if modalities_list is None:
        modalities_list = modality_count.keys()
    docs_total_len = 0
    vocab_size = 0
    for modality in modalities_list:
        tokens_total_sum = modality_count[modality]
        vocab_size += modality_vocab_size[modality]
        w = modality_weights[modality]
        docs_total_len += w * tokens_total_sum
    # TODO: check if formula is OK
    tau = (docs_total_len / (n_topics * vocab_size)) * gimel / (1 - gimel)
    return tau
def theta_weight_abs2rel(ds, modality_weights, n_topics, tau)
Expand source code
def theta_weight_abs2rel(ds, modality_weights, n_topics, tau):
    avg_doc_len = calc_docs_avg_len(ds, modality_weights)
    gimel_multiplier = avg_doc_len / n_topics + tau
    gimel = tau / gimel_multiplier
    return gimel
def theta_weight_rel2abs(ds, modality_weights, n_topics, gimel)
Expand source code
def theta_weight_rel2abs(ds, modality_weights, n_topics, gimel):
    avg_doc_len = calc_docs_avg_len(ds, modality_weights)
    tau = (avg_doc_len / n_topics) * gimel / (1 - gimel)
    return tau
def transform_regularizer(tokens_data, reg, modality_weights, n_topics=None)
Expand source code
def transform_regularizer(tokens_data, reg, modality_weights, n_topics=None):

    if n_topics is None and len(reg.topic_names) == 0:
        raise ValueError('Number of topics to regularize should be specified')
    if n_topics is None:
        n_topics = len(reg.topic_names)

    (modality_count, modality_vocab_size, num_docs) = tokens_data

    new_tau = compute_regularizer_tau(tokens_data, reg, modality_weights, n_topics)
    reg_class = reg.__class__
    reg_copy = reg_class(
            tau=new_tau,
            name=reg.name,
            topic_names=reg.topic_names,
            # class_ids=reg.class_ids
    )
    return reg_copy