Module topicnet.cooking_machine.config_parser
Parsing text file into Experiment instance using strictyaml (github.com/crdoconnor/strictyaml/)
The aim here is to make config: possible to use even for non-programmers hard to misuse * easy debuggable
Hence, the process of parsing config is a bit more complicated than it could be, but it produces more useful error messages. For example:
File $YOUR_CONFIG.yaml, line 42
topic_names: 10
^ this value should be a 'list' instead of 'int'
YAMLValidationError: 'int' passed instead of 'list'
instead of:
File $SOME_FILE.py, line 666, in $SOME_FUNCTION
for topic_name in topic_names:
TypeError: 'int' object is not iterable
To achieve this, strictyaml makes use of various validators which keep track of individual line numbers and which fragments are already checked and which aren't quite here yet.
Our process consists of three stages:
1) we check the high-level structure using BASE_SCHEMA
.
The presence of each required key is ensured.
After this stage we could be sure than we can create a valid model
using specified parameters.
2) we make a second pass and revalidate 'regularizers' and 'stages'
This step is performed semi-automatically: using inspect
,
we extract everything from __init__
method signature.
For example:
def init(self, num_iters: int = 5)
allows us to infer that num_iters parameter should be int,
but it isn't strictly required.
3) we construct instances of classes required, convert types manually and implement some shortcuts. Ideally, this stage should be performed using revalidate() as well, but it's a work-in-progress currently.
Expand source code
"""
Parsing text file into Experiment instance using strictyaml
(github.com/crdoconnor/strictyaml/)
The aim here is to make config:
* possible to use even for non-programmers
* hard to misuse
* easy debuggable
Hence, the process of parsing config is a bit more complicated than
it could be, but it produces more useful error messages. For example:
File $YOUR_CONFIG.yaml, line 42
topic_names: 10
^ this value should be a 'list' instead of 'int'
YAMLValidationError: 'int' passed instead of 'list'
instead of:
File $SOME_FILE.py, line 666, in $SOME_FUNCTION
for topic_name in topic_names:
TypeError: 'int' object is not iterable
To achieve this, strictyaml makes use of various validators which
keep track of individual line numbers and which fragments are already
checked and which aren't quite here yet.
Our process consists of three stages:
1) we check the high-level structure using `BASE_SCHEMA`.
The presence of each required key is ensured.
After this stage we could be sure than we can create a valid model
using specified parameters.
2) we make a second pass and revalidate 'regularizers' and 'stages'
This step is performed semi-automatically: using `inspect`,
we extract everything from `__init__` method signature.
For example:
def __init__(self, num_iters: int = 5)
allows us to infer that num_iters parameter should be int,
but it isn't strictly required.
3) we construct instances of classes required, convert types manually
and implement some shortcuts.
Ideally, this stage should be performed using revalidate() as well,
but it's a work-in-progress currently.
""" # noqa: W291
from inspect import signature, Parameter
from typing import (
Callable,
Type,
)
from .cubes import (
CubeCreator,
RegularizersModifierCube,
GreedyStrategy,
PerplexityStrategy,
)
from .experiment import Experiment
from .dataset import Dataset
from .models import scores as tnscores
from .models import TopicModel
from .model_constructor import (
create_default_topics,
init_simple_default_model,
)
from .rel_toolbox_lite import (
count_vocab_size,
handle_regularizer,
)
import artm
from strictyaml import Map, Str, Int, Seq, Float, Bool
from strictyaml import Any, Optional, EmptyDict, EmptyNone, EmptyList
from strictyaml import dirty_load
SUPPORTED_CUBES = [CubeCreator, RegularizersModifierCube]
SUPPORTED_STRATEGIES = [PerplexityStrategy, GreedyStrategy]
TYPE_VALIDATORS = {
'int': Int(), 'bool': Bool(), 'str': Str(), 'float': Float()
}
def choose_key(param):
"""
Parameters
----------
param : inspect.Parameter
Returns
-------
str or strictyaml.Optional
"""
if param.default is not Parameter.empty:
return Optional(param.name)
return param.name
def choose_validator(param):
"""
Parameters
----------
param : inspect.Parameter
Returns
-------
instance of strictyaml.Validator
"""
if param.annotation is int:
return Int()
if param.annotation is float:
return Float()
if param.annotation is bool:
return Bool()
if param.annotation is str:
return Str()
if param.name in ARTM_TYPES:
return ARTM_TYPES[param.name]
return Any()
# TODO: maybe this is cool, but do we really need this?
def build_schema_from_function(func: Callable) -> dict:
from docstring_parser import parse as docstring_parse
func_params = signature(func).parameters
func_params_schema = dict()
for elem in docstring_parse(func.__doc__).params:
if elem.arg_name in func_params:
key = choose_key(func_params[elem.arg_name])
func_params_schema[key] = TYPE_VALIDATORS[elem.type_name]
return func_params_schema
# TODO: use stackoverflow.com/questions/37929851/parse-numpydoc-docstring-and-access-components
# for now just hardcode most common / important types
ARTM_TYPES = {
"tau": Float(),
"topic_names": Str() | Seq(Str()) | EmptyNone(),
# TODO: handle class_ids in model and in regularizers separately
"class_ids": Str() | Seq(Str()) | EmptyNone(),
"gamma": Float() | EmptyNone(),
"seed": Int(),
"num_document_passes": Int(),
"num_processors": Int(),
"cache_theta": Bool(),
"reuse_theta": Bool(),
"theta_name": Str()
}
_ELEMENT = Any()
# TODO: maybe better _DICTIONARY_FILTER_SCHEMA = build_schema_from_function(artm.Dictionary.filter)
# TODO: modalities, filter params - these all are dataset's options, not model's
# maybe make separate YML block for dataset?
BASE_SCHEMA = Map({
'regularizers': Seq(_ELEMENT),
Optional('scores'): Seq(_ELEMENT),
'stages': Seq(_ELEMENT),
'model': Map({
"dataset_path": Str(),
Optional("dictionary_filter_parameters"): Map({
Optional("class_id"): Str(),
Optional("min_df"): Float(),
Optional("max_df"): Float(),
Optional("min_df_rate"): Float(),
Optional("max_df_rate"): Float(),
Optional("min_tf"): Float(),
Optional("max_tf"): Float(),
Optional("max_dictionary_size"): Float(),
Optional("recalculate_value"): Bool(),
}),
Optional("keep_in_memory"): Bool(),
Optional("internals_folder_path"): Bool(),
Optional("modalities_to_use"): Seq(Str()),
Optional("modalities_weights"): Any(),
"main_modality": Str(),
}),
'topics': Map({
"background_topics": Seq(Str()) | Int() | EmptyList(),
"specific_topics": Seq(Str()) | Int() | EmptyList(),
})
})
KEY_DICTIONARY_FILTER_PARAMETERS = 'dictionary_filter_parameters'
def build_schema_from_signature(class_of_object, use_optional=True):
"""
Parameters
----------
class_of_object : class
Returns
-------
dict
each element is either str -> Validator or Optional(str) -> Validator
"""
choose_key_func = choose_key if use_optional else (lambda param: param.name)
return {choose_key_func(param): choose_validator(param)
for param in signature(class_of_object.__init__).parameters.values()
if param.name != 'self'}
def wrap_in_map(dictionary):
could_be_empty = all(isinstance(key, Optional) for key in dictionary)
if could_be_empty:
return Map(dictionary) | EmptyDict()
return Map(dictionary)
def build_schema_for_scores():
"""
Returns
-------
strictyaml.Map
schema used for validation and type-coercion
"""
schemas = {}
for elem in artm.scores.__all__:
if "Score" in elem:
class_of_object = getattr(artm.scores, elem)
# TODO: check if every key is Optional. If it is, then "| EmptyDict()"
# otherwise, just Map()
res = wrap_in_map(build_schema_from_signature(class_of_object))
specific_schema = Map({class_of_object.__name__: res})
schemas[class_of_object.__name__] = specific_schema
for elem in tnscores.__all__:
if "Score" in elem:
class_of_object = getattr(tnscores, elem)
res = build_schema_from_signature(class_of_object)
# res["name"] = Str() # TODO: support custom names
res = wrap_in_map(res)
specific_schema = Map({class_of_object.__name__: res})
schemas[class_of_object.__name__] = specific_schema
return schemas
def build_schema_for_regs():
"""
Returns
-------
strictyaml.Map
schema used for validation and type-coercion
"""
schemas = {}
for elem in artm.regularizers.__all__:
if "Regularizer" in elem:
class_of_object = getattr(artm.regularizers, elem)
res = build_schema_from_signature(class_of_object)
if elem in ["SmoothSparseThetaRegularizer", "SmoothSparsePhiRegularizer",
"DecorrelatorPhiRegularizer"]:
res[Optional("relative", default=None)] = Bool()
res = wrap_in_map(res)
specific_schema = Map({class_of_object.__name__: res})
schemas[class_of_object.__name__] = specific_schema
return schemas
def is_key_in_schema(key, schema):
if key in schema:
return True
return any(
key_val.key == key for key_val in schema
if isinstance(key_val, Optional)
)
def build_schema_for_cubes():
"""
Returns
-------
dict
each element is str -> strictyaml.Map
where key is name of cube,
value is a schema used for validation and type-coercion
"""
schemas = {}
for class_of_object in SUPPORTED_CUBES:
res = build_schema_from_signature(class_of_object)
# "selection" isn't used in __init__, but we will need it later
res["selection"] = Seq(Str())
# shortcut for strategy intialization
if is_key_in_schema("strategy", res):
signature_validation = {}
for strategy_class in SUPPORTED_STRATEGIES:
local_signature_validation = build_schema_from_signature(strategy_class)
signature_validation.update(local_signature_validation)
res[Optional("strategy_params")] = Map(signature_validation)
# we will deal with "values" later, but we can check at least some simple things already
if class_of_object.__name__ == "CubeCreator":
element = Map({"name": Str(), "values": Seq(Any())})
res["parameters"] = Seq(element)
if class_of_object.__name__ == "RegularizersModifierCube":
element = Map({
Optional("name"): Str(),
Optional("regularizer"): Any(),
Optional("tau_grid"): Seq(Float())
})
res["regularizer_parameters"] = element | Seq(element)
res = Map(res)
specific_schema = Map({class_of_object.__name__: res})
schemas[class_of_object.__name__] = specific_schema
return schemas
def preprocess_parameters_for_cube_creator(elem_args):
"""
This function does two things:
1) convert class_ids from
name: class_ids@text, values: [0, 1, 2, 3]
to
name: class_ids, values: {"@text": [0, 1, 2, 3]}
2) type conversion for "values" field.
Parameters
----------
elem_args: strictyaml.YAML object
(contains dict inside)
Returns
-------
new_elem_args: dict
"""
for param_portion in elem_args["parameters"]:
name = str(param_portion["name"])
if name.startswith("class_ids"):
validator = Float() | Seq(Float())
else:
validator = Seq(ARTM_TYPES[name])
param_schema = Map({
"name": Str(),
"values": validator
})
param_portion.revalidate(param_schema)
def handle_special_cases(elem_args, kwargs):
"""
In-place fixes kwargs, handling special cases and shortcuts
(only strategy for now)
Parameters
----------
elem_args: dict
kwargs: dict
"""
# special case: shortcut for strategy
if "strategy" in elem_args:
strategy = None
for strategy_class in SUPPORTED_STRATEGIES:
if strategy_class.__name__ == elem_args["strategy"]:
strat_schema = build_schema_from_signature(strategy_class, use_optional=False)
strat_kwargs = {}
for key, value in elem_args["strategy_params"].items():
key = str(key)
value.revalidate(strat_schema[key])
strat_kwargs[key] = value.data
strategy = strategy_class(**strat_kwargs)
kwargs["strategy"] = strategy # or None if failed to identify it
def build_score(elemtype, elem_args, is_artm_score):
"""
Parameters
----------
elemtype : str
name of score
elem_args: dict
is_artm_score: bool
Returns
-------
instance of artm.scores.BaseScore or topicnet.cooking_machine.models.base_score
"""
module = artm.scores if is_artm_score else tnscores
class_of_object = getattr(module, elemtype)
kwargs = {name: value
for name, value in elem_args.items()}
return class_of_object(**kwargs)
def build_regularizer(elemtype, elem_args, specific_topic_names, background_topic_names):
"""
Parameters
----------
elemtype : str
name of regularizer
elem_args: dict
parsed: strictyaml.YAML object
Returns
-------
instance of artm.Regularizer
"""
class_of_object = getattr(artm.regularizers, elemtype)
kwargs = {name: value
for name, value in elem_args.items()}
# special case: shortcut for topic_names
if "topic_names" in kwargs:
if kwargs["topic_names"] == "background_topics":
kwargs["topic_names"] = background_topic_names
if kwargs["topic_names"] == "specific_topics":
kwargs["topic_names"] = specific_topic_names
return class_of_object(**kwargs)
def build_cube_settings(elemtype, elem_args):
"""
Parameters
----------
elemtype : str
name of regularizer
elem_args: strictyaml.YAML object
(contains dict inside)
Returns
-------
list of dict
"""
if elemtype == "CubeCreator":
preprocess_parameters_for_cube_creator(elem_args)
kwargs = {name: value
for name, value in elem_args.data.items()
if name not in ['selection', 'strategy', 'strategy_params']}
handle_special_cases(elem_args, kwargs)
return {elemtype: kwargs,
"selection": elem_args['selection'].data}
def _add_parsed_scores(parsed, topic_model):
""" """
for score in parsed.data.get('scores', []):
for elemtype, elem_args in score.items():
is_artm_score = elemtype in artm.scores.__all__
score_object = build_score(elemtype, elem_args, is_artm_score)
if is_artm_score:
topic_model._model.scores.add(score_object, overwrite=True)
else:
topic_model.custom_scores[elemtype] = score_object
def _add_parsed_regularizers(
parsed, model, specific_topic_names, background_topic_names, data_stats
):
""" """
regularizers = []
for stage in parsed.data['regularizers']:
for elemtype, elem_args in stage.items():
should_be_relative = None
if "relative" in elem_args:
should_be_relative = elem_args["relative"]
elem_args.pop("relative")
regularizer_object = build_regularizer(
elemtype, elem_args, specific_topic_names, background_topic_names
)
handle_regularizer(should_be_relative, model, regularizer_object, data_stats)
regularizers.append(model.regularizers[regularizer_object.name])
return regularizers
def parse_modalities_data(parsed):
has_modalities_to_use = is_key_in_schema("modalities_to_use", parsed["model"])
has_weights = is_key_in_schema("modalities_weights", parsed["model"])
main_modality = parsed["model"]["main_modality"]
# exactly one should be specified
if has_modalities_to_use == has_weights:
raise ValueError("Either 'modalities_to_use' or 'modalities_weights' should be specified")
if has_weights:
modalities_to_use = list(parsed["model"]["modalities_weights"].data)
if main_modality not in modalities_to_use:
modalities_to_use.append(main_modality)
local_schema = Map({
key: Float() for key in modalities_to_use
})
parsed["model"]["modalities_weights"].revalidate(local_schema)
modalities_weights = parsed["model"]["modalities_weights"].data
return modalities_weights
else:
modalities_to_use = parsed.data["model"]["modalities_to_use"]
return modalities_to_use
def parse(
yaml_string: str,
force_separate_thread: bool = False,
dataset_class: Type[Dataset] = Dataset
):
"""
Parameters
----------
yaml_string : str
force_separate_thread : bool
dataset_class : class
Returns
-------
cube_settings: list of dict
regularizers: list
topic_model: TopicModel
dataset: Dataset
"""
parsed = dirty_load(yaml_string, BASE_SCHEMA, allow_flow_style=True)
specific_topic_names, background_topic_names = create_default_topics(
parsed.data["topics"]["specific_topics"],
parsed.data["topics"]["background_topics"]
)
revalidate_section(parsed, "stages")
revalidate_section(parsed, "regularizers")
if "scores" in parsed:
revalidate_section(parsed, "scores")
dataset = dataset_class(
data_path=parsed.data["model"]["dataset_path"],
keep_in_memory=parsed.data["model"].get("keep_in_memory", True),
internals_folder_path=parsed.data["model"].get("internals_folder_path", None),
)
filter_parameters = parsed.data["model"].get(
KEY_DICTIONARY_FILTER_PARAMETERS, dict()
)
if len(filter_parameters) > 0:
filtered_dictionary = dataset.get_dictionary().filter(**filter_parameters)
dataset._cached_dict = filtered_dictionary
modalities_to_use = parse_modalities_data(parsed)
data_stats = count_vocab_size(dataset.get_dictionary(), modalities_to_use)
model = init_simple_default_model(
dataset=dataset,
modalities_to_use=modalities_to_use,
main_modality=parsed.data["model"]["main_modality"],
specific_topics=parsed.data["topics"]["specific_topics"],
background_topics=parsed.data["topics"]["background_topics"],
)
regularizers = _add_parsed_regularizers(
parsed, model, specific_topic_names, background_topic_names, data_stats
)
topic_model = TopicModel(model)
_add_parsed_scores(parsed, topic_model)
cube_settings = list()
for stage in parsed['stages']:
for elemtype, elem_args in stage.items():
settings = build_cube_settings(elemtype.data, elem_args)
settings[elemtype]["separate_thread"] = force_separate_thread
cube_settings.append(settings)
return cube_settings, regularizers, topic_model, dataset
def revalidate_section(parsed, section):
"""
Perofrms in-place type coercion and validation
Parameters
----------
parsed : strictyaml.YAML object
(half-parsed, half-validated chunk of config)
section: str
"""
if section == "stages":
schemas = build_schema_for_cubes()
elif section == "regularizers":
schemas = build_schema_for_regs()
elif section == "scores":
schemas = build_schema_for_scores()
else:
raise ValueError(f"Unknown section name '{section}'")
for i, stage in enumerate(parsed[section]):
assert len(stage) == 1
name = list(stage.data)[0]
if name not in schemas:
raise ValueError(f"Unsupported {section} value: {name} at line {stage.start_line}")
local_schema = schemas[name]
stage.revalidate(local_schema)
def build_experiment_environment_from_yaml_config(
yaml_string,
experiment_id,
save_path,
force_separate_thread=False,
):
"""
Wraps up parameter extraction and class instances creation
from yaml formatted string
together with the method that builds experiment pipeline from
given experiment parameters (model, cubes, regularizers, etc)
Parameters
----------
yaml_string: str
config that contains the whole experiment pipeline description
with its parameters
save_path: str
path to the folder to save experiment logs and models
experiment_id: str
name of the experiment folder
force_separate_thread: bool default = False
experimental feature that packs model training into
separate process which is killed upon training completion
by default is not used
Returns
-------
tuple experiment, dataset instances of corresponding classes from topicnet
"""
settings, regs, model, dataset = parse(yaml_string, force_separate_thread)
# TODO: handle dynamic addition of regularizers
experiment = Experiment(experiment_id=experiment_id, save_path=save_path, topic_model=model)
experiment.build(settings)
return experiment, dataset
Functions
def build_cube_settings(elemtype, elem_args)
-
Parameters
elemtype
:str
- name of regularizer
elem_args
:strictyaml.YAML object
- (contains dict inside)
Returns
list
ofdict
Expand source code
def build_cube_settings(elemtype, elem_args): """ Parameters ---------- elemtype : str name of regularizer elem_args: strictyaml.YAML object (contains dict inside) Returns ------- list of dict """ if elemtype == "CubeCreator": preprocess_parameters_for_cube_creator(elem_args) kwargs = {name: value for name, value in elem_args.data.items() if name not in ['selection', 'strategy', 'strategy_params']} handle_special_cases(elem_args, kwargs) return {elemtype: kwargs, "selection": elem_args['selection'].data}
def build_experiment_environment_from_yaml_config(yaml_string, experiment_id, save_path, force_separate_thread=False)
-
Wraps up parameter extraction and class instances creation from yaml formatted string together with the method that builds experiment pipeline from given experiment parameters (model, cubes, regularizers, etc)
Parameters
yaml_string
:str
- config that contains the whole experiment pipeline description with its parameters
save_path
:str
- path to the folder to save experiment logs and models
experiment_id
:str
- name of the experiment folder
force_separate_thread
:bool default = False
- experimental feature that packs model training into separate process which is killed upon training completion by default is not used
Returns
tuple experiment, dataset instances
ofcorresponding classes from topicnet
Expand source code
def build_experiment_environment_from_yaml_config( yaml_string, experiment_id, save_path, force_separate_thread=False, ): """ Wraps up parameter extraction and class instances creation from yaml formatted string together with the method that builds experiment pipeline from given experiment parameters (model, cubes, regularizers, etc) Parameters ---------- yaml_string: str config that contains the whole experiment pipeline description with its parameters save_path: str path to the folder to save experiment logs and models experiment_id: str name of the experiment folder force_separate_thread: bool default = False experimental feature that packs model training into separate process which is killed upon training completion by default is not used Returns ------- tuple experiment, dataset instances of corresponding classes from topicnet """ settings, regs, model, dataset = parse(yaml_string, force_separate_thread) # TODO: handle dynamic addition of regularizers experiment = Experiment(experiment_id=experiment_id, save_path=save_path, topic_model=model) experiment.build(settings) return experiment, dataset
def build_regularizer(elemtype, elem_args, specific_topic_names, background_topic_names)
-
Parameters
elemtype
:str
- name of regularizer
elem_args
:dict
parsed
:strictyaml.YAML object
Returns
instance
ofartm.Regularizer
Expand source code
def build_regularizer(elemtype, elem_args, specific_topic_names, background_topic_names): """ Parameters ---------- elemtype : str name of regularizer elem_args: dict parsed: strictyaml.YAML object Returns ------- instance of artm.Regularizer """ class_of_object = getattr(artm.regularizers, elemtype) kwargs = {name: value for name, value in elem_args.items()} # special case: shortcut for topic_names if "topic_names" in kwargs: if kwargs["topic_names"] == "background_topics": kwargs["topic_names"] = background_topic_names if kwargs["topic_names"] == "specific_topics": kwargs["topic_names"] = specific_topic_names return class_of_object(**kwargs)
def build_schema_for_cubes()
-
Returns
dict
- each element is str -> strictyaml.Map where key is name of cube, value is a schema used for validation and type-coercion
Expand source code
def build_schema_for_cubes(): """ Returns ------- dict each element is str -> strictyaml.Map where key is name of cube, value is a schema used for validation and type-coercion """ schemas = {} for class_of_object in SUPPORTED_CUBES: res = build_schema_from_signature(class_of_object) # "selection" isn't used in __init__, but we will need it later res["selection"] = Seq(Str()) # shortcut for strategy intialization if is_key_in_schema("strategy", res): signature_validation = {} for strategy_class in SUPPORTED_STRATEGIES: local_signature_validation = build_schema_from_signature(strategy_class) signature_validation.update(local_signature_validation) res[Optional("strategy_params")] = Map(signature_validation) # we will deal with "values" later, but we can check at least some simple things already if class_of_object.__name__ == "CubeCreator": element = Map({"name": Str(), "values": Seq(Any())}) res["parameters"] = Seq(element) if class_of_object.__name__ == "RegularizersModifierCube": element = Map({ Optional("name"): Str(), Optional("regularizer"): Any(), Optional("tau_grid"): Seq(Float()) }) res["regularizer_parameters"] = element | Seq(element) res = Map(res) specific_schema = Map({class_of_object.__name__: res}) schemas[class_of_object.__name__] = specific_schema return schemas
def build_schema_for_regs()
-
Returns
strictyaml.Map
- schema used for validation and type-coercion
Expand source code
def build_schema_for_regs(): """ Returns ------- strictyaml.Map schema used for validation and type-coercion """ schemas = {} for elem in artm.regularizers.__all__: if "Regularizer" in elem: class_of_object = getattr(artm.regularizers, elem) res = build_schema_from_signature(class_of_object) if elem in ["SmoothSparseThetaRegularizer", "SmoothSparsePhiRegularizer", "DecorrelatorPhiRegularizer"]: res[Optional("relative", default=None)] = Bool() res = wrap_in_map(res) specific_schema = Map({class_of_object.__name__: res}) schemas[class_of_object.__name__] = specific_schema return schemas
def build_schema_for_scores()
-
Returns
strictyaml.Map
- schema used for validation and type-coercion
Expand source code
def build_schema_for_scores(): """ Returns ------- strictyaml.Map schema used for validation and type-coercion """ schemas = {} for elem in artm.scores.__all__: if "Score" in elem: class_of_object = getattr(artm.scores, elem) # TODO: check if every key is Optional. If it is, then "| EmptyDict()" # otherwise, just Map() res = wrap_in_map(build_schema_from_signature(class_of_object)) specific_schema = Map({class_of_object.__name__: res}) schemas[class_of_object.__name__] = specific_schema for elem in tnscores.__all__: if "Score" in elem: class_of_object = getattr(tnscores, elem) res = build_schema_from_signature(class_of_object) # res["name"] = Str() # TODO: support custom names res = wrap_in_map(res) specific_schema = Map({class_of_object.__name__: res}) schemas[class_of_object.__name__] = specific_schema return schemas
def build_schema_from_function(func: Callable) ‑> dict
-
Expand source code
def build_schema_from_function(func: Callable) -> dict: from docstring_parser import parse as docstring_parse func_params = signature(func).parameters func_params_schema = dict() for elem in docstring_parse(func.__doc__).params: if elem.arg_name in func_params: key = choose_key(func_params[elem.arg_name]) func_params_schema[key] = TYPE_VALIDATORS[elem.type_name] return func_params_schema
def build_schema_from_signature(class_of_object, use_optional=True)
-
Parameters
class_of_object
:class
Returns
dict
- each element is either str -> Validator or Optional(str) -> Validator
Expand source code
def build_schema_from_signature(class_of_object, use_optional=True): """ Parameters ---------- class_of_object : class Returns ------- dict each element is either str -> Validator or Optional(str) -> Validator """ choose_key_func = choose_key if use_optional else (lambda param: param.name) return {choose_key_func(param): choose_validator(param) for param in signature(class_of_object.__init__).parameters.values() if param.name != 'self'}
def build_score(elemtype, elem_args, is_artm_score)
-
Parameters
elemtype
:str
- name of score
elem_args
:dict
is_artm_score
:bool
Returns
instance
ofartm.scores.BaseScore
ortopicnet.cooking_machine.models.base_score
Expand source code
def build_score(elemtype, elem_args, is_artm_score): """ Parameters ---------- elemtype : str name of score elem_args: dict is_artm_score: bool Returns ------- instance of artm.scores.BaseScore or topicnet.cooking_machine.models.base_score """ module = artm.scores if is_artm_score else tnscores class_of_object = getattr(module, elemtype) kwargs = {name: value for name, value in elem_args.items()} return class_of_object(**kwargs)
def choose_key(param)
-
Parameters
param
:inspect.Parameter
Returns
str
orstrictyaml.Optional
Expand source code
def choose_key(param): """ Parameters ---------- param : inspect.Parameter Returns ------- str or strictyaml.Optional """ if param.default is not Parameter.empty: return Optional(param.name) return param.name
def choose_validator(param)
-
Parameters
param
:inspect.Parameter
Returns
instance
ofstrictyaml.Validator
Expand source code
def choose_validator(param): """ Parameters ---------- param : inspect.Parameter Returns ------- instance of strictyaml.Validator """ if param.annotation is int: return Int() if param.annotation is float: return Float() if param.annotation is bool: return Bool() if param.annotation is str: return Str() if param.name in ARTM_TYPES: return ARTM_TYPES[param.name] return Any()
def handle_special_cases(elem_args, kwargs)
-
In-place fixes kwargs, handling special cases and shortcuts (only strategy for now) Parameters
elem_args
:dict
kwargs
:dict
Expand source code
def handle_special_cases(elem_args, kwargs): """ In-place fixes kwargs, handling special cases and shortcuts (only strategy for now) Parameters ---------- elem_args: dict kwargs: dict """ # special case: shortcut for strategy if "strategy" in elem_args: strategy = None for strategy_class in SUPPORTED_STRATEGIES: if strategy_class.__name__ == elem_args["strategy"]: strat_schema = build_schema_from_signature(strategy_class, use_optional=False) strat_kwargs = {} for key, value in elem_args["strategy_params"].items(): key = str(key) value.revalidate(strat_schema[key]) strat_kwargs[key] = value.data strategy = strategy_class(**strat_kwargs) kwargs["strategy"] = strategy # or None if failed to identify it
def is_key_in_schema(key, schema)
-
Expand source code
def is_key_in_schema(key, schema): if key in schema: return True return any( key_val.key == key for key_val in schema if isinstance(key_val, Optional) )
def parse(yaml_string: str, force_separate_thread: bool = False, dataset_class: Type[Dataset] = topicnet.cooking_machine.dataset.Dataset)
-
Parameters
yaml_string
:str
force_separate_thread
:bool
dataset_class
:class
Returns
cube_settings
:list
ofdict
regularizers
:list
topic_model
:TopicModel
dataset
:Dataset
Expand source code
def parse( yaml_string: str, force_separate_thread: bool = False, dataset_class: Type[Dataset] = Dataset ): """ Parameters ---------- yaml_string : str force_separate_thread : bool dataset_class : class Returns ------- cube_settings: list of dict regularizers: list topic_model: TopicModel dataset: Dataset """ parsed = dirty_load(yaml_string, BASE_SCHEMA, allow_flow_style=True) specific_topic_names, background_topic_names = create_default_topics( parsed.data["topics"]["specific_topics"], parsed.data["topics"]["background_topics"] ) revalidate_section(parsed, "stages") revalidate_section(parsed, "regularizers") if "scores" in parsed: revalidate_section(parsed, "scores") dataset = dataset_class( data_path=parsed.data["model"]["dataset_path"], keep_in_memory=parsed.data["model"].get("keep_in_memory", True), internals_folder_path=parsed.data["model"].get("internals_folder_path", None), ) filter_parameters = parsed.data["model"].get( KEY_DICTIONARY_FILTER_PARAMETERS, dict() ) if len(filter_parameters) > 0: filtered_dictionary = dataset.get_dictionary().filter(**filter_parameters) dataset._cached_dict = filtered_dictionary modalities_to_use = parse_modalities_data(parsed) data_stats = count_vocab_size(dataset.get_dictionary(), modalities_to_use) model = init_simple_default_model( dataset=dataset, modalities_to_use=modalities_to_use, main_modality=parsed.data["model"]["main_modality"], specific_topics=parsed.data["topics"]["specific_topics"], background_topics=parsed.data["topics"]["background_topics"], ) regularizers = _add_parsed_regularizers( parsed, model, specific_topic_names, background_topic_names, data_stats ) topic_model = TopicModel(model) _add_parsed_scores(parsed, topic_model) cube_settings = list() for stage in parsed['stages']: for elemtype, elem_args in stage.items(): settings = build_cube_settings(elemtype.data, elem_args) settings[elemtype]["separate_thread"] = force_separate_thread cube_settings.append(settings) return cube_settings, regularizers, topic_model, dataset
def parse_modalities_data(parsed)
-
Expand source code
def parse_modalities_data(parsed): has_modalities_to_use = is_key_in_schema("modalities_to_use", parsed["model"]) has_weights = is_key_in_schema("modalities_weights", parsed["model"]) main_modality = parsed["model"]["main_modality"] # exactly one should be specified if has_modalities_to_use == has_weights: raise ValueError("Either 'modalities_to_use' or 'modalities_weights' should be specified") if has_weights: modalities_to_use = list(parsed["model"]["modalities_weights"].data) if main_modality not in modalities_to_use: modalities_to_use.append(main_modality) local_schema = Map({ key: Float() for key in modalities_to_use }) parsed["model"]["modalities_weights"].revalidate(local_schema) modalities_weights = parsed["model"]["modalities_weights"].data return modalities_weights else: modalities_to_use = parsed.data["model"]["modalities_to_use"] return modalities_to_use
def preprocess_parameters_for_cube_creator(elem_args)
-
This function does two things: 1) convert class_ids from name: class_ids@text, values: [0, 1, 2, 3] to name: class_ids, values: {"@text": [0, 1, 2, 3]} 2) type conversion for "values" field.
Parameters
elem_args
:strictyaml.YAML object
- (contains dict inside)
Returns
new_elem_args
:dict
Expand source code
def preprocess_parameters_for_cube_creator(elem_args): """ This function does two things: 1) convert class_ids from name: class_ids@text, values: [0, 1, 2, 3] to name: class_ids, values: {"@text": [0, 1, 2, 3]} 2) type conversion for "values" field. Parameters ---------- elem_args: strictyaml.YAML object (contains dict inside) Returns ------- new_elem_args: dict """ for param_portion in elem_args["parameters"]: name = str(param_portion["name"]) if name.startswith("class_ids"): validator = Float() | Seq(Float()) else: validator = Seq(ARTM_TYPES[name]) param_schema = Map({ "name": Str(), "values": validator }) param_portion.revalidate(param_schema)
def revalidate_section(parsed, section)
-
Perofrms in-place type coercion and validation
Parameters
parsed
:strictyaml.YAML object
- (half-parsed, half-validated chunk of config)
section
:str
Expand source code
def revalidate_section(parsed, section): """ Perofrms in-place type coercion and validation Parameters ---------- parsed : strictyaml.YAML object (half-parsed, half-validated chunk of config) section: str """ if section == "stages": schemas = build_schema_for_cubes() elif section == "regularizers": schemas = build_schema_for_regs() elif section == "scores": schemas = build_schema_for_scores() else: raise ValueError(f"Unknown section name '{section}'") for i, stage in enumerate(parsed[section]): assert len(stage) == 1 name = list(stage.data)[0] if name not in schemas: raise ValueError(f"Unsupported {section} value: {name} at line {stage.start_line}") local_schema = schemas[name] stage.revalidate(local_schema)
def wrap_in_map(dictionary)
-
Expand source code
def wrap_in_map(dictionary): could_be_empty = all(isinstance(key, Optional) for key in dictionary) if could_be_empty: return Map(dictionary) | EmptyDict() return Map(dictionary)