Module topicnet.cooking_machine.models.semantic_radius_score
Expand source code
import artm
import operator
import functools
import numpy as np
import pandas as pd
from collections import Counter, OrderedDict
from scipy.optimize import curve_fit
from .base_score import BaseScore
def calculate_n(model, batch_vectorizer):
"""
Calculate all necessary statistics from batch. This may take some time.
"""
doc2token = {}
for batch_id in range(len(batch_vectorizer._batches_list)):
batch_name = batch_vectorizer._batches_list[batch_id]._filename
batch = artm.messages.Batch()
with open(batch_name, "rb") as f:
batch.ParseFromString(f.read())
for item_id in range(len(batch.item)):
item = batch.item[item_id]
theta_item_id = getattr(item, model.theta_columns_naming)
doc2token[theta_item_id] = {'tokens': [], 'weights': []}
for token_id, token_weight in zip(item.token_id, item.token_weight):
doc2token[theta_item_id]['tokens'].append(batch.token[token_id])
doc2token[theta_item_id]['weights'].append(token_weight)
previous_num_document_passes = model._num_document_passes
model._num_document_passes = 10
ptdw = model.transform(batch_vectorizer=batch_vectorizer, theta_matrix_type='dense_ptdw')
model._num_document_passes = previous_num_document_passes
docs = ptdw.columns
docs_unique = OrderedDict.fromkeys(docs).keys()
tokens = [doc2token[doc_id]['tokens'] for doc_id in docs_unique]
tokens = functools.reduce(operator.iconcat, tokens, [])
ndw = np.concatenate([np.array(doc2token[doc_id]['weights']) for doc_id in docs_unique])
ndw = np.tile(ndw, (ptdw.shape[0], 1))
ptdw.columns = pd.MultiIndex.from_arrays([docs, tokens], names=('doc', 'token'))
ntdw = ptdw * ndw
ntd = ntdw.groupby(level=0, axis=1).sum()
nwt = ntdw.groupby(level=1, axis=1).sum().T
nt = nwt.sum(axis=0)
return ntdw, ntd, nwt, nt
def synthetic_doc_ntdw_and_ntd(doc_len, nwt):
"""
Create synthetic document from nwt with specific doc_len.
"""
pwt = np.float64(nwt) / np.sum(np.float64(nwt)).astype(float)
doc_idx = np.random.choice(len(pwt), doc_len, p=pwt)
doc_count = dict(Counter(doc_idx))
ntdw = np.empty((len(pwt)))
for word_idx in range(len(ntdw)):
ntdw[word_idx] = doc_count.get(word_idx, 0)
ntd = np.sum(ntdw)
return ntdw, ntd
def cressie_reed_sampled(topic, ntdw_calc, ntd_calc, nwt, nt, gimel=-1/2):
"""
Calculate Cressie-Reed divergence for sampled pseudo-document.
"""
mul_part = ntd_calc * nwt.iloc[:, topic]
if np.all(ntdw_calc == 0) or nt[topic] == 0 or np.all(mul_part == 0):
gimel_part = np.array([0])
else:
gimel_part = 0
for token_id, token in enumerate(nwt.index):
token_ntdw = ntdw_calc[token_id]
token_denom = mul_part.iloc[token_id]
if token_ntdw and token_denom:
gimel_part += token_ntdw * (
np.power(token_ntdw * nt[topic] / token_denom, gimel) - 1
)
cressie_reed_for_l = 2 / (gimel * (gimel + 1)) * np.sum(gimel_part)
return cressie_reed_for_l
def third_degree(x, a, b, c, d):
return a + b * x + c * x ** 2 + d * x ** 3
def radius_vs_ndt(topic, max_len, sample_step, sample_size, nwt, nt, alpha):
"""
Calculate third degree approximation for radius vs ndt dependency.
"""
crs_for_alpha = []
ntds_sampled = []
for doc_len in range(1, max_len, sample_step):
local_crs_for_alpha = []
for _ in range(sample_size):
ntdw_sampled, ntd_sampled = synthetic_doc_ntdw_and_ntd(doc_len, nwt.iloc[:, topic])
local_crs_for_alpha.append(cressie_reed_sampled(
topic, ntdw_sampled, ntd_sampled, nwt, nt
))
crs_for_alpha.append(np.quantile(local_crs_for_alpha, 1 - alpha))
ntds_sampled.append(ntd_sampled)
regression_coeff, cov = curve_fit(third_degree, ntds_sampled, crs_for_alpha)
return regression_coeff
def radii_vs_ntd(max_len, sample_step, sample_size, nwt, nt, alpha):
regression_coeffs = []
for topic in range(len(nt)):
regression_coeffs.append(radius_vs_ndt(
topic, max_len, sample_step, sample_size, nwt, nt, alpha
))
return regression_coeffs
def radius_for_ntd(ntd, regression_coeff):
return third_degree(ntd, *regression_coeff)
def radii_for_ntd(ntd, regression_coeff):
return ntd.apply(lambda x: third_degree(x, *regression_coeff))
class SemanticRadiusScore(BaseScore):
"""
This score implements cluster semantic radius, described in paper
'Проверка гипотезы условной независимости
для оценивания качества тематической кластеризации' by Rogozina A.
At the core this score helps to discover topics uniformity.
The lower this score - better
""" # noqa: W291
def __init__(self, batch_vectorizer, name: str = None):
"""
Parameters
----------
name:
Name of the score
batch_vectorizer
"""
super().__init__(name=name)
self.batch_vectorizer = batch_vectorizer
def __repr__(self):
return f'{self.__class__.__name__}(batch_vectorizer={self.batch_vectorizer!r})'
def update(self, score):
known_errors = (ValueError, TypeError)
try:
score = np.array(score, float)
except known_errors:
raise ValueError(f'Score call should return list of float but not {score}')
self.value.append(score)
def call(self, model, max_sampled_document_len=None, sample_step=5, sample_size=3, alpha=0.1):
"""
Parameters
----------
model : TopicModel
max_sampled_document_len : int
Maximum length of pseudo-document for quantile regression
(Default value = None)
sample_step : int
Grain for quantile regression
(Default value = 5)
sample_size : int
Size of every sample for quantile regression
(Default value = 3)
alpha : float
(1 - alpha) quantile level, must be <= 1
(Default value = 0.1)
""" # noqa: W291
ntdw, ntd, nwt, nt = calculate_n(model._model, self.batch_vectorizer)
if max_sampled_document_len is None:
max_sampled_document_len = int(np.max(ntd.values))
regression_coeffs = radii_vs_ntd(
max_sampled_document_len, sample_step, sample_size, nwt, nt, alpha
)
radii = [
radius_for_ntd(topic_ntd, coeff)
for topic_ntd, coeff
in zip(ntd.values.mean(axis=1), regression_coeffs)
]
return radii
Functions
def calculate_n(model, batch_vectorizer)
-
Calculate all necessary statistics from batch. This may take some time.
Expand source code
def calculate_n(model, batch_vectorizer): """ Calculate all necessary statistics from batch. This may take some time. """ doc2token = {} for batch_id in range(len(batch_vectorizer._batches_list)): batch_name = batch_vectorizer._batches_list[batch_id]._filename batch = artm.messages.Batch() with open(batch_name, "rb") as f: batch.ParseFromString(f.read()) for item_id in range(len(batch.item)): item = batch.item[item_id] theta_item_id = getattr(item, model.theta_columns_naming) doc2token[theta_item_id] = {'tokens': [], 'weights': []} for token_id, token_weight in zip(item.token_id, item.token_weight): doc2token[theta_item_id]['tokens'].append(batch.token[token_id]) doc2token[theta_item_id]['weights'].append(token_weight) previous_num_document_passes = model._num_document_passes model._num_document_passes = 10 ptdw = model.transform(batch_vectorizer=batch_vectorizer, theta_matrix_type='dense_ptdw') model._num_document_passes = previous_num_document_passes docs = ptdw.columns docs_unique = OrderedDict.fromkeys(docs).keys() tokens = [doc2token[doc_id]['tokens'] for doc_id in docs_unique] tokens = functools.reduce(operator.iconcat, tokens, []) ndw = np.concatenate([np.array(doc2token[doc_id]['weights']) for doc_id in docs_unique]) ndw = np.tile(ndw, (ptdw.shape[0], 1)) ptdw.columns = pd.MultiIndex.from_arrays([docs, tokens], names=('doc', 'token')) ntdw = ptdw * ndw ntd = ntdw.groupby(level=0, axis=1).sum() nwt = ntdw.groupby(level=1, axis=1).sum().T nt = nwt.sum(axis=0) return ntdw, ntd, nwt, nt
def cressie_reed_sampled(topic, ntdw_calc, ntd_calc, nwt, nt, gimel=-0.5)
-
Calculate Cressie-Reed divergence for sampled pseudo-document.
Expand source code
def cressie_reed_sampled(topic, ntdw_calc, ntd_calc, nwt, nt, gimel=-1/2): """ Calculate Cressie-Reed divergence for sampled pseudo-document. """ mul_part = ntd_calc * nwt.iloc[:, topic] if np.all(ntdw_calc == 0) or nt[topic] == 0 or np.all(mul_part == 0): gimel_part = np.array([0]) else: gimel_part = 0 for token_id, token in enumerate(nwt.index): token_ntdw = ntdw_calc[token_id] token_denom = mul_part.iloc[token_id] if token_ntdw and token_denom: gimel_part += token_ntdw * ( np.power(token_ntdw * nt[topic] / token_denom, gimel) - 1 ) cressie_reed_for_l = 2 / (gimel * (gimel + 1)) * np.sum(gimel_part) return cressie_reed_for_l
def radii_for_ntd(ntd, regression_coeff)
-
Expand source code
def radii_for_ntd(ntd, regression_coeff): return ntd.apply(lambda x: third_degree(x, *regression_coeff))
def radii_vs_ntd(max_len, sample_step, sample_size, nwt, nt, alpha)
-
Expand source code
def radii_vs_ntd(max_len, sample_step, sample_size, nwt, nt, alpha): regression_coeffs = [] for topic in range(len(nt)): regression_coeffs.append(radius_vs_ndt( topic, max_len, sample_step, sample_size, nwt, nt, alpha )) return regression_coeffs
def radius_for_ntd(ntd, regression_coeff)
-
Expand source code
def radius_for_ntd(ntd, regression_coeff): return third_degree(ntd, *regression_coeff)
def radius_vs_ndt(topic, max_len, sample_step, sample_size, nwt, nt, alpha)
-
Calculate third degree approximation for radius vs ndt dependency.
Expand source code
def radius_vs_ndt(topic, max_len, sample_step, sample_size, nwt, nt, alpha): """ Calculate third degree approximation for radius vs ndt dependency. """ crs_for_alpha = [] ntds_sampled = [] for doc_len in range(1, max_len, sample_step): local_crs_for_alpha = [] for _ in range(sample_size): ntdw_sampled, ntd_sampled = synthetic_doc_ntdw_and_ntd(doc_len, nwt.iloc[:, topic]) local_crs_for_alpha.append(cressie_reed_sampled( topic, ntdw_sampled, ntd_sampled, nwt, nt )) crs_for_alpha.append(np.quantile(local_crs_for_alpha, 1 - alpha)) ntds_sampled.append(ntd_sampled) regression_coeff, cov = curve_fit(third_degree, ntds_sampled, crs_for_alpha) return regression_coeff
def synthetic_doc_ntdw_and_ntd(doc_len, nwt)
-
Create synthetic document from nwt with specific doc_len.
Expand source code
def synthetic_doc_ntdw_and_ntd(doc_len, nwt): """ Create synthetic document from nwt with specific doc_len. """ pwt = np.float64(nwt) / np.sum(np.float64(nwt)).astype(float) doc_idx = np.random.choice(len(pwt), doc_len, p=pwt) doc_count = dict(Counter(doc_idx)) ntdw = np.empty((len(pwt))) for word_idx in range(len(ntdw)): ntdw[word_idx] = doc_count.get(word_idx, 0) ntd = np.sum(ntdw) return ntdw, ntd
def third_degree(x, a, b, c, d)
-
Expand source code
def third_degree(x, a, b, c, d): return a + b * x + c * x ** 2 + d * x ** 3
Classes
class SemanticRadiusScore (batch_vectorizer, name: str = None)
-
This score implements cluster semantic radius, described in paper 'Проверка гипотезы условной независимости для оценивания качества тематической кластеризации' by Rogozina A. At the core this score helps to discover topics uniformity. The lower this score - better
Parameters
- name:
- Name of the score
batch_vectorizer
Expand source code
class SemanticRadiusScore(BaseScore): """ This score implements cluster semantic radius, described in paper 'Проверка гипотезы условной независимости для оценивания качества тематической кластеризации' by Rogozina A. At the core this score helps to discover topics uniformity. The lower this score - better """ # noqa: W291 def __init__(self, batch_vectorizer, name: str = None): """ Parameters ---------- name: Name of the score batch_vectorizer """ super().__init__(name=name) self.batch_vectorizer = batch_vectorizer def __repr__(self): return f'{self.__class__.__name__}(batch_vectorizer={self.batch_vectorizer!r})' def update(self, score): known_errors = (ValueError, TypeError) try: score = np.array(score, float) except known_errors: raise ValueError(f'Score call should return list of float but not {score}') self.value.append(score) def call(self, model, max_sampled_document_len=None, sample_step=5, sample_size=3, alpha=0.1): """ Parameters ---------- model : TopicModel max_sampled_document_len : int Maximum length of pseudo-document for quantile regression (Default value = None) sample_step : int Grain for quantile regression (Default value = 5) sample_size : int Size of every sample for quantile regression (Default value = 3) alpha : float (1 - alpha) quantile level, must be <= 1 (Default value = 0.1) """ # noqa: W291 ntdw, ntd, nwt, nt = calculate_n(model._model, self.batch_vectorizer) if max_sampled_document_len is None: max_sampled_document_len = int(np.max(ntd.values)) regression_coeffs = radii_vs_ntd( max_sampled_document_len, sample_step, sample_size, nwt, nt, alpha ) radii = [ radius_for_ntd(topic_ntd, coeff) for topic_ntd, coeff in zip(ntd.values.mean(axis=1), regression_coeffs) ] return radii
Ancestors
Methods
def call(self, model, max_sampled_document_len=None, sample_step=5, sample_size=3, alpha=0.1)
-
Parameters
model
:TopicModel
max_sampled_document_len
:int
- Maximum length of pseudo-document for quantile regression (Default value = None)
sample_step
:int
- Grain for quantile regression (Default value = 5)
sample_size
:int
- Size of every sample for quantile regression
(Default value = 3) alpha
:float
- (1 - alpha) quantile level, must be <= 1
(Default value = 0.1)
Expand source code
def call(self, model, max_sampled_document_len=None, sample_step=5, sample_size=3, alpha=0.1): """ Parameters ---------- model : TopicModel max_sampled_document_len : int Maximum length of pseudo-document for quantile regression (Default value = None) sample_step : int Grain for quantile regression (Default value = 5) sample_size : int Size of every sample for quantile regression (Default value = 3) alpha : float (1 - alpha) quantile level, must be <= 1 (Default value = 0.1) """ # noqa: W291 ntdw, ntd, nwt, nt = calculate_n(model._model, self.batch_vectorizer) if max_sampled_document_len is None: max_sampled_document_len = int(np.max(ntd.values)) regression_coeffs = radii_vs_ntd( max_sampled_document_len, sample_step, sample_size, nwt, nt, alpha ) radii = [ radius_for_ntd(topic_ntd, coeff) for topic_ntd, coeff in zip(ntd.values.mean(axis=1), regression_coeffs) ] return radii
Inherited members