diff --git a/common/setups/rasr/hybrid_system.py b/common/setups/rasr/hybrid_system.py index 8ad96549f..cd082144f 100644 --- a/common/setups/rasr/hybrid_system.py +++ b/common/setups/rasr/hybrid_system.py @@ -21,17 +21,21 @@ add_tf_flow_to_base_flow, ) from i6_core.util import MultiPath, MultiOutputPath +from i6_core.mm import CreateDummyMixturesJob +from i6_core.returnn import ReturnnComputePriorJobV2 -from .nn_system import NnSystem +from .hybrid_decoder import HybridDecoder +from .nn_system import NnSystem, returnn_training from .util import ( RasrInitArgs, ReturnnRasrDataInput, - OggZipHdfDataInput, HybridArgs, NnRecogArgs, RasrSteps, NnForcedAlignArgs, + ReturnnTrainingJobArgs, + AllowedReturnnTrainingDataInput, ) # -------------------- Init -------------------- @@ -90,11 +94,13 @@ def __init__( self.cv_corpora = [] self.devtrain_corpora = [] - self.train_input_data = None # type:Optional[Dict[str, ReturnnRasrDataInput]] - self.cv_input_data = None # type:Optional[Dict[str, ReturnnRasrDataInput]] - self.devtrain_input_data = None # type:Optional[Dict[str, ReturnnRasrDataInput]] - self.dev_input_data = None # type:Optional[Dict[str, ReturnnRasrDataInput]] - self.test_input_data = None # type:Optional[Dict[str, ReturnnRasrDataInput]] + self.train_input_data: Optional[Dict[str, Union[ReturnnRasrDataInput, AllowedReturnnTrainingDataInput]]] = None + self.cv_input_data: Optional[Dict[str, Union[ReturnnRasrDataInput, AllowedReturnnTrainingDataInput]]] = None + self.devtrain_input_data: Optional[ + Dict[str, Union[ReturnnRasrDataInput, AllowedReturnnTrainingDataInput]] + ] = None + self.dev_input_data: Optional[Dict[str, ReturnnRasrDataInput]] = None + self.test_input_data: Optional[Dict[str, ReturnnRasrDataInput]] = None self.train_cv_pairing = None @@ -128,9 +134,9 @@ def _add_output_alias_for_train_job( def init_system( self, rasr_init_args: RasrInitArgs, - train_data: Dict[str, Union[ReturnnRasrDataInput, OggZipHdfDataInput]], - cv_data: Dict[str, Union[ReturnnRasrDataInput, OggZipHdfDataInput]], - devtrain_data: Optional[Dict[str, Union[ReturnnRasrDataInput, OggZipHdfDataInput]]] = None, + train_data: Dict[str, Union[ReturnnRasrDataInput, AllowedReturnnTrainingDataInput]], + cv_data: Dict[str, Union[ReturnnRasrDataInput, AllowedReturnnTrainingDataInput]], + devtrain_data: Optional[Dict[str, Union[ReturnnRasrDataInput, AllowedReturnnTrainingDataInput]]] = None, dev_data: Optional[Dict[str, ReturnnRasrDataInput]] = None, test_data: Optional[Dict[str, ReturnnRasrDataInput]] = None, train_cv_pairing: Optional[List[Tuple[str, ...]]] = None, # List[Tuple[trn_c, cv_c, name, dvtr_c]] @@ -211,27 +217,29 @@ def generate_lattices(self): def returnn_training( self, - name, - returnn_config, - nn_train_args, + name: str, + returnn_config: returnn.ReturnnConfig, + nn_train_args: Union[Dict, ReturnnTrainingJobArgs], train_corpus_key, cv_corpus_key, devtrain_corpus_key=None, - ): - assert isinstance(returnn_config, returnn.ReturnnConfig) - - returnn_config.config["train"] = self.train_input_data[train_corpus_key].get_data_dict() - returnn_config.config["dev"] = self.cv_input_data[cv_corpus_key].get_data_dict() - if devtrain_corpus_key is not None: - returnn_config.config["eval_datasets"] = { - "devtrain": self.devtrain_input_data[devtrain_corpus_key].get_data_dict() - } - - train_job = returnn.ReturnnTrainingJob( + ) -> returnn.ReturnnTrainingJob: + if isinstance(nn_train_args, ReturnnTrainingJobArgs): + if nn_train_args.returnn_root is None: + nn_train_args.returnn_root = self.returnn_root + if nn_train_args.returnn_python_exe is None: + nn_train_args.returnn_python_exe = self.returnn_python_exe + + train_job = returnn_training( + name=name, returnn_config=returnn_config, - returnn_root=self.returnn_root, - returnn_python_exe=self.returnn_python_exe, - **nn_train_args, + training_args=nn_train_args, + train_data=self.train_input_data[train_corpus_key], + cv_data=self.cv_input_data[cv_corpus_key], + additional_data={"devtrain": self.devtrain_input_data[devtrain_corpus_key]} + if devtrain_corpus_key is not None + else None, + register_output=False, ) self._add_output_alias_for_train_job( train_job=train_job, @@ -346,7 +354,9 @@ def nn_recognition( name: str, returnn_config: returnn.ReturnnConfig, checkpoints: Dict[int, returnn.Checkpoint], - acoustic_mixture_path: tk.Path, # TODO maybe Optional if prior file provided -> automatically construct dummy file + acoustic_mixture_path: Optional[ + tk.Path + ], # TODO maybe Optional if prior file provided -> automatically construct dummy file prior_scales: List[float], pronunciation_scales: List[float], lm_scales: List[float], @@ -362,6 +372,7 @@ def nn_recognition( use_epoch_for_compile=False, forward_output_layer="output", native_ops: Optional[List[str]] = None, + train_job: Optional[Union[returnn.ReturnnTrainingJob, returnn.ReturnnRasrTrainingJob]] = None, **kwargs, ): with tk.block(f"{name}_recognition"): @@ -383,17 +394,37 @@ def nn_recognition( epochs = epochs if epochs is not None else list(checkpoints.keys()) for pron, lm, prior, epoch in itertools.product(pronunciation_scales, lm_scales, prior_scales, epochs): - assert epoch in checkpoints.keys() - assert acoustic_mixture_path is not None - - if use_epoch_for_compile: - tf_graph = self.nn_compile_graph(name, returnn_config, epoch=epoch) + assert epoch in checkpoints.keys() + prior_file = None + lmgc_scorer = None + if acoustic_mixture_path is None: + assert train_job is not None, "Need ReturnnTrainingJob for computation of priors" + tmp_acoustic_mixture_path = CreateDummyMixturesJob( + num_mixtures=returnn_config.config["extern_data"]["classes"]["dim"], + num_features=returnn_config.config["extern_data"]["data"]["dim"], + ).out_mixtures + lmgc_scorer = rasr.GMMFeatureScorer(tmp_acoustic_mixture_path) + prior_job = ReturnnComputePriorJobV2( + model_checkpoint=checkpoints[epoch], + returnn_config=train_job.returnn_config, + returnn_python_exe=train_job.returnn_python_exe, + returnn_root=train_job.returnn_root, + log_verbosity=train_job.returnn_config.post_config["log_verbosity"], + ) + prior_job.add_alias("extract_nn_prior/" + name) + prior_file = prior_job.out_prior_xml_file + else: + tmp_acoustic_mixture_path = acoustic_mixture_path scorer = rasr.PrecomputedHybridFeatureScorer( - prior_mixtures=acoustic_mixture_path, + prior_mixtures=tmp_acoustic_mixture_path, # This needs to be a new variable otherwise nesting causes undesired behavior priori_scale=prior, + prior_file=prior_file, ) + if use_epoch_for_compile: + tf_graph = self.nn_compile_graph(name, returnn_config, epoch=epoch) + tf_flow = make_precomputed_hybrid_tf_feature_flow( tf_checkpoint=checkpoints[epoch], tf_graph=tf_graph, @@ -419,6 +450,8 @@ def nn_recognition( parallelize_conversion=parallelize_conversion, rtf=rtf, mem=mem, + lmgc_alias=f"lmgc/{name}/{recognition_corpus_key}-{recog_name}", + lmgc_scorer=lmgc_scorer, **kwargs, ) @@ -429,14 +462,21 @@ def nn_recog( returnn_config: Path, checkpoints: Dict[int, returnn.Checkpoint], step_args: HybridArgs, + train_job: Union[returnn.ReturnnTrainingJob, returnn.ReturnnRasrTrainingJob], ): for recog_name, recog_args in step_args.recognition_args.items(): + recog_args = copy.deepcopy(recog_args) + whitelist = recog_args.pop("training_whitelist", None) + if whitelist: + if train_name not in whitelist: + continue for dev_c in self.dev_corpora: self.nn_recognition( name=f"{train_corpus_key}-{train_name}-{recog_name}", returnn_config=returnn_config, checkpoints=checkpoints, acoustic_mixture_path=self.train_input_data[train_corpus_key].acoustic_mixtures, + train_job=train_job, recognition_corpus_key=dev_c, **recog_args, ) @@ -452,6 +492,7 @@ def nn_recog( returnn_config=returnn_config, checkpoints=checkpoints, acoustic_mixture_path=self.train_input_data[train_corpus_key].acoustic_mixtures, + train_job=train_job, recognition_corpus_key=tst_c, **r_args, ) @@ -472,7 +513,7 @@ def nn_compile_graph( :return: the TF graph """ graph_compile_job = returnn.CompileTFGraphJob( - returnn_config, + returnn_config=returnn_config, epoch=epoch, returnn_root=self.returnn_root, returnn_python_exe=self.returnn_python_exe, @@ -509,7 +550,7 @@ def run_nn_step(self, step_name: str, step_args: HybridArgs): train_corpus_key=trn_c, cv_corpus_key=cv_c, ) - else: + elif isinstance(self.train_input_data[trn_c], AllowedReturnnTrainingDataInput): returnn_train_job = self.returnn_training( name=name, returnn_config=step_args.returnn_training_configs[name], @@ -518,6 +559,8 @@ def run_nn_step(self, step_name: str, step_args: HybridArgs): cv_corpus_key=cv_c, devtrain_corpus_key=dvtr_c, ) + else: + raise NotImplementedError returnn_recog_config = step_args.returnn_recognition_configs.get( name, step_args.returnn_training_configs[name] @@ -529,6 +572,7 @@ def run_nn_step(self, step_name: str, step_args: HybridArgs): returnn_config=returnn_recog_config, checkpoints=returnn_train_job.out_checkpoints, step_args=step_args, + train_job=returnn_train_job, ) def run_nn_recog_step(self, step_args: NnRecogArgs): diff --git a/common/setups/rasr/nn_system.py b/common/setups/rasr/nn_system.py index 13c3d239d..93321e823 100644 --- a/common/setups/rasr/nn_system.py +++ b/common/setups/rasr/nn_system.py @@ -1,36 +1,19 @@ -__all__ = ["NnSystem"] +__all__ = ["NnSystem", "returnn_training"] import copy -import itertools -import sys from dataclasses import asdict -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Union # -------------------- Sisyphus -------------------- -import sisyphus.toolkit as tk -import sisyphus.global_settings as gs - -from sisyphus.delayed_ops import DelayedFormat +from sisyphus import tk, gs # -------------------- Recipes -------------------- -import i6_core.features as features -import i6_core.rasr as rasr import i6_core.returnn as returnn -from i6_core.util import MultiPath, MultiOutputPath - from .rasr_system import RasrSystem - -from .util import ( - RasrInitArgs, - ReturnnRasrDataInput, - OggZipHdfDataInput, - HybridArgs, - NnRecogArgs, - RasrSteps, -) +from .util import ReturnnTrainingJobArgs, AllowedReturnnTrainingDataInput # -------------------- Init -------------------- @@ -95,3 +78,35 @@ def get_native_ops(self, op_names: Optional[List[str]]) -> Optional[List[tk.Path if op_name not in self.native_ops.keys(): self.compile_native_op(op_name) return [self.native_ops[op_name] for op_name in op_names] + + +def returnn_training( + name: str, + returnn_config: returnn.ReturnnConfig, + training_args: Union[Dict, ReturnnTrainingJobArgs], + train_data: AllowedReturnnTrainingDataInput, + *, + cv_data: Optional[AllowedReturnnTrainingDataInput] = None, + additional_data: Optional[Dict[str, AllowedReturnnTrainingDataInput]] = None, + register_output: bool = True, +) -> returnn.ReturnnTrainingJob: + assert isinstance(returnn_config, returnn.ReturnnConfig) + + config = copy.deepcopy(returnn_config) + + config.config["train"] = train_data if isinstance(train_data, Dict) else train_data.get_data_dict() + if cv_data is not None: + config.config["dev"] = cv_data if isinstance(cv_data, Dict) else cv_data.get_data_dict() + if additional_data is not None: + config.config["eval_datasets"] = {} + for name, data in additional_data.items(): + config.config["eval_datasets"][name] = data if isinstance(data, Dict) else data.get_data_dict() + returnn_training_job = returnn.ReturnnTrainingJob( + returnn_config=config, + **asdict(training_args) if isinstance(training_args, ReturnnTrainingJobArgs) else training_args, + ) + if register_output: + returnn_training_job.add_alias(f"nn_train/{name}") + tk.register_output(f"nn_train/{name}_learning_rates.png", returnn_training_job.out_plot_lr) + + return returnn_training_job diff --git a/common/setups/rasr/util/nn.py b/common/setups/rasr/util/nn.py deleted file mode 100644 index 2e3d1b175..000000000 --- a/common/setups/rasr/util/nn.py +++ /dev/null @@ -1,438 +0,0 @@ -__all__ = [ - "ReturnnRasrTrainingArgs", - "ReturnnRasrDataInput", - "OggZipHdfDataInput", - "HybridArgs", - "NnRecogArgs", - "NnForcedAlignArgs", -] - -import copy -from dataclasses import dataclass, asdict -from typing import Any, Dict, List, Optional, Tuple, Type, TypedDict, Union - -from sisyphus import tk -from sisyphus.delayed_ops import DelayedFormat - -import i6_core.am as am -import i6_core.rasr as rasr -import i6_core.returnn as returnn - -from i6_core.util import MultiPath - -from .rasr import RasrDataInput - -RasrCacheTypes = Union[tk.Path, str, MultiPath, rasr.FlagDependentFlowAttribute] - - -@dataclass(frozen=True) -class ReturnnRasrTrainingArgs: - """ - Options for writing a RASR training config. See `ReturnnRasrTrainingJob`. - Most of them may be disregarded, i.e. the defaults can be left untouched. - - :param partition_epochs: if >1, split the full dataset into multiple sub-epochs - :param num_classes: number of classes - :param disregarded_classes: path to file with list of disregarded classes - :param class_label_file: path to file with class labels - :param buffer_size: buffer size for data loading - :param extra_rasr_config: extra RASR config - :param extra_rasr_post_config: extra RASR post config - :param use_python_control: whether to use python control, usually True - """ - - partition_epochs: Optional[int] = None - num_classes: Optional[int] = None - disregarded_classes: Optional[tk.Path] = None - class_label_file: Optional[tk.Path] = None - buffer_size: int = 200 * 1024 - extra_rasr_config: Optional[rasr.RasrConfig] = None - extra_rasr_post_config: Optional[rasr.RasrConfig] = None - use_python_control: bool = True - - -class ReturnnRasrDataInput: - """ - Holds the data for ReturnnRasrTrainingJob. - """ - - def __init__( - self, - name: str, - crp: Optional[rasr.CommonRasrParameters] = None, - alignments: Optional[RasrCacheTypes] = None, - feature_flow: Optional[Union[rasr.FlowNetwork, Dict[str, rasr.FlowNetwork]]] = None, - features: Optional[Union[RasrCacheTypes, Dict[str, RasrCacheTypes]]] = None, - acoustic_mixtures: Optional[Union[tk.Path, str]] = None, - feature_scorers: Optional[Dict[str, Type[rasr.FeatureScorer]]] = None, - shuffle_data: bool = True, - shuffling_parameters: Optional[Dict[str, Any]] = None, - stm: Optional[tk.Path] = None, - glm: Optional[tk.Path] = None, - returnn_rasr_training_args: Optional[ReturnnRasrTrainingArgs] = None, - **kwargs, - ): - """ - - :param name: name of the data - :param crp: common RASR parameters - :param alignments: RASR cache of an alignment - :param feature_flow: acoustic feature flow network or dict of feature flow networks - :param features: RASR cache of acoustic features - :param acoustic_mixtures: path to a RASR acoustic mixture file (used in System classes, not RETURNN training) - :param feature_scorers: RASR feature scorers - :param shuffle_data: shuffle training segments into bins of similar length. The bins are sorted by length. - :param shuffling_parameters: Dict of additional parameters to set for shuffling, - currently only 'segment_order_sort_by_time_length_chunk_size' is supported - :param stm: stm file for scoring - :param glm: glm file for scoring - :param returnn_rasr_training_args: arguments for RETURNN training with RASR - """ - self.name = name - self.crp = crp - self.alignments = alignments - self.feature_flow = feature_flow - self.features = features - self.acoustic_mixtures = acoustic_mixtures - self.feature_scorers = feature_scorers - self.shuffle_data = shuffle_data - self.shuffling_parameters = shuffling_parameters - if shuffle_data and self.shuffling_parameters is None: - # apply the legacy defaults if shuffling_parameters is not set - self.shuffling_parameters = {"segment_order_sort_by_time_length_chunk_size": 384} - self.stm = stm - self.glm = glm - self.returnn_rasr_training_args = returnn_rasr_training_args or ReturnnRasrTrainingArgs() - - def get_training_feature_flow_file(self) -> tk.Path: - """Returns the feature flow file for the RETURNN training with RASR.""" - feature_flow = returnn.ReturnnRasrTrainingJob.create_flow(self.feature_flow, self.alignments) - write_feature_flow = rasr.WriteFlowNetworkJob(feature_flow) - return write_feature_flow.out_flow_file - - def get_training_rasr_config_file(self) -> tk.Path: - """Returns the RASR config file for the RETURNN training with RASR.""" - config, post_config = returnn.ReturnnRasrTrainingJob.create_config( - self.crp, self.alignments, **asdict(self.returnn_rasr_training_args) - ) - config.neural_network_trainer.feature_extraction.file = self.get_training_feature_flow_file() - write_rasr_config = rasr.WriteRasrConfigJob(config, post_config) - return write_rasr_config.out_config - - def get_data_dict(self) -> Dict[str, Union[str, DelayedFormat, tk.Path]]: - """Returns the data dict for the ExternSprintDataset to be used in a training ReturnnConfig.""" - config_file = self.get_training_rasr_config_file() - config_str = DelayedFormat("--config={} --*.LOGFILE=nn-trainer.{}.log --*.TASK=1", config_file, self.name) - dataset = { - "class": "ExternSprintDataset", - "sprintTrainerExecPath": rasr.RasrCommand.select_exe(self.crp.nn_trainer_exe, "nn-trainer"), - "sprintConfigStr": config_str, - } - partition_epochs = self.returnn_rasr_training_args.partition_epochs - if partition_epochs is not None: - dataset["partitionEpoch"] = partition_epochs - return dataset - - def build_crp( - self, - am_args, - corpus_object, - concurrent, - segment_path, - lexicon_args, - cart_tree_path=None, - allophone_file=None, - lm_args=None, - ): - """ - constructs and returns a CommonRasrParameters from the given settings and files - """ - crp = rasr.CommonRasrParameters() - rasr.crp_add_default_output(crp) - crp.acoustic_model_config = am.acoustic_model_config(**am_args) - rasr.crp_set_corpus(crp, corpus_object) - crp.concurrent = concurrent - crp.segment_path = segment_path - - crp.lexicon_config = rasr.RasrConfig() - crp.lexicon_config.file = lexicon_args["filename"] - crp.lexicon_config.normalize_pronunciation = lexicon_args["normalize_pronunciation"] - - if "add_from_lexicon" in lexicon_args: - crp.acoustic_model_config.allophones.add_from_lexicon = lexicon_args["add_from_lexicon"] - if "add_all" in lexicon_args: - crp.acoustic_model_config.allophones.add_all = lexicon_args["add_all"] - - if cart_tree_path is not None: - crp.acoustic_model_config.state_tying.type = "cart" - crp.acoustic_model_config.state_tying.file = cart_tree_path - - if lm_args is not None: - crp.language_model_config = rasr.RasrConfig() - crp.language_model_config.type = lm_args["type"] - crp.language_model_config.file = lm_args["filename"] - crp.language_model_config.scale = lm_args["scale"] - - if allophone_file is not None: - crp.acoustic_model_config.allophones.add_from_file = allophone_file - - self.crp = crp - - def update_crp_with_shuffle_parameters(self): - if self.shuffle_data: - self.crp.corpus_config.segment_order_shuffle = True - if self.shuffling_parameters is not None: - if "segment_order_sort_by_time_length_chunk_size" in self.shuffling_parameters: - self.crp.corpus_config.segment_order_sort_by_time_length = True - self.crp.corpus_config.segment_order_sort_by_time_length_chunk_size = self.shuffling_parameters[ - "segment_order_sort_by_time_length_chunk_size" - ] - - def update_crp_with( - self, - *, - corpus_file: Optional[tk.Path] = None, - audio_dir: Optional[Union[str, tk.Path]] = None, - corpus_duration: Optional[int] = None, - segment_path: Optional[Union[str, tk.Path]] = None, - concurrent: Optional[int] = None, - shuffle_data: Optional[bool] = None, - shuffling_parameters: Optional[Dict[str, Any]] = None, - ): - if corpus_file is not None: - self.crp.corpus_config.file = corpus_file - if audio_dir is not None: - self.crp.corpus_config.audio_dir = audio_dir - if corpus_duration is not None: - self.crp.corpus_duration = corpus_duration - if segment_path is not None: - self.crp.segment_path = segment_path - if concurrent is not None: - self.crp.concurrent = concurrent - if shuffle_data is not None: - self.shuffle_data = shuffle_data - if shuffling_parameters is not None: - assert self.shuffle_data, "You need to set shuffle_data to true when using shuffling_parameters" - self.shuffling_parameters = shuffling_parameters - self.update_crp_with_shuffle_parameters() - - def get_crp(self, **kwargs) -> rasr.CommonRasrParameters: - """ - constructs and returns a CommonRasrParameters from the given settings and files - :rtype CommonRasrParameters: - """ - if self.crp is None: - self.build_crp(**kwargs) - - if self.shuffle_data: - self.update_crp_with_shuffle_parameters() - - return self.crp - - -class OggZipHdfDataInput: - def __init__( - self, - oggzip_files: List[tk.Path], - alignments: List[tk.Path], - audio: Dict, - partition_epoch: int = 1, - seq_ordering: str = "laplace:.1000", - meta_args: Optional[Dict[str, Any]] = None, - ogg_args: Optional[Dict[str, Any]] = None, - hdf_args: Optional[Dict[str, Any]] = None, - acoustic_mixtures: Optional[tk.Path] = None, - ): - """ - :param oggzip_files: zipped ogg files which contain the audio - :param alignments: hdf files which contain dumped RASR alignments - :param audio: e.g. {"features": "raw", "sample_rate": 16000} for raw waveform input with a sample rate of 16 kHz - :param partition_epoch: if >1, split the full dataset into multiple sub-epochs - :param seq_ordering: sort the sequences in the dataset, e.g. "random" or "laplace:.100" - :param meta_args: parameters for the `MetaDataset` - :param ogg_args: parameters for the `OggZipDataset` - :param hdf_args: parameters for the `HdfDataset` - :param acoustic_mixtures: path to a RASR acoustic mixture file (used in System classes, not RETURNN training) - """ - self.oggzip_files = oggzip_files - self.alignments = alignments - self.audio = audio - self.partition_epoch = partition_epoch - self.seq_ordering = seq_ordering - self.meta_args = meta_args - self.ogg_args = ogg_args - self.hdf_args = hdf_args - self.acoustic_mixtures = acoustic_mixtures - - def get_data_dict(self): - return { - "class": "MetaDataset", - "data_map": {"classes": ("hdf", "classes"), "data": ("ogg", "data")}, - "datasets": { - "hdf": { - "class": "HDFDataset", - "files": self.alignments, - "use_cache_manager": True, - **(self.hdf_args or {}), - }, - "ogg": { - "class": "OggZipDataset", - "audio": self.audio, - "partition_epoch": self.partition_epoch, - "path": self.oggzip_files, - "seq_ordering": self.seq_ordering, - "use_cache_manager": True, - **(self.ogg_args or {}), - }, - }, - "seq_order_control_dataset": "ogg", - **(self.meta_args or {}), - } - - -# Attribute names are invalid identifiers, therefore use old syntax -SearchParameters = TypedDict( - "SearchParameters", - { - "beam-pruning": float, - "beam-pruning-limit": float, - "lm-state-pruning": Optional[float], - "word-end-pruning": float, - "word-end-pruning-limit": float, - }, -) - - -class LookaheadOptions(TypedDict): - cache_high: Optional[int] - cache_low: Optional[int] - history_limit: Optional[int] - laziness: Optional[int] - minimum_representation: Optional[int] - tree_cutoff: Optional[int] - - -class LatticeToCtmArgs(TypedDict): - best_path_algo: Optional[str] - encoding: Optional[str] - extra_config: Optional[Any] - extra_post_config: Optional[Any] - fill_empty_segments: Optional[bool] - - -class NnRecogArgs(TypedDict): - acoustic_mixture_path: Optional[tk.Path] - checkpoints: Optional[Dict[int, returnn.Checkpoint]] - create_lattice: Optional[bool] - epochs: Optional[List[int]] - eval_best_in_lattice: Optional[bool] - eval_single_best: Optional[bool] - feature_flow_key: str - lattice_to_ctm_kwargs: Optional[LatticeToCtmArgs] - lm_lookahead: bool - lm_scales: List[float] - lookahead_options: Optional[LookaheadOptions] - mem: int - name: str - optimize_am_lm_scale: bool - parallelize_conversion: Optional[bool] - prior_scales: List[float] - pronunciation_scales: List[float] - returnn_config: Optional[returnn.ReturnnConfig] - rtf: int - search_parameters: Optional[SearchParameters] - use_gpu: Optional[bool] - - -KeyedRecogArgsType = Dict[str, Union[Dict[str, Any], NnRecogArgs]] - - -class EpochPartitioning(TypedDict): - dev: int - train: int - - -class NnTrainingArgs(TypedDict): - buffer_size: Optional[int] - class_label_file: Optional[tk.Path] - cpu_rqmt: Optional[int] - device: Optional[str] - disregarded_classes: Optional[Any] - extra_rasr_config: Optional[rasr.RasrConfig] - extra_rasr_post_config: Optional[rasr.RasrConfig] - horovod_num_processes: Optional[int] - keep_epochs: Optional[bool] - log_verbosity: Optional[int] - mem_rqmt: Optional[int] - num_classes: int - num_epochs: int - partition_epochs: Optional[EpochPartitioning] - save_interval: Optional[int] - time_rqmt: Optional[int] - use_python_control: Optional[bool] - - -class HybridArgs: - def __init__( - self, - returnn_training_configs: Dict[str, returnn.ReturnnConfig], - returnn_recognition_configs: Dict[str, returnn.ReturnnConfig], - training_args: Union[Dict[str, Any], NnTrainingArgs], - recognition_args: KeyedRecogArgsType, - test_recognition_args: Optional[KeyedRecogArgsType] = None, - ): - """ - ################################################## - :param returnn_training_configs - RETURNN config keyed by training corpus. - ################################################## - :param returnn_recognition_configs - If a config is not found here, the corresponding training config is used - ################################################## - :param training_args: - ################################################## - :param recognition_args: - Configuration for recognition on dev corpora. - ################################################## - :param test_recognition_args: - Additional configuration for recognition on test corpora. Merged with recognition_args. - ################################################## - """ - self.returnn_training_configs = returnn_training_configs - self.returnn_recognition_configs = returnn_recognition_configs - self.training_args = training_args - self.recognition_args = recognition_args - self.test_recognition_args = test_recognition_args - - -@dataclass() -class NnRecogArgs: - name: str - returnn_config: returnn.ReturnnConfig - checkpoints: Dict[int, returnn.Checkpoint] - acoustic_mixture_path: tk.Path - prior_scales: List[float] - pronunciation_scales: List[float] - lm_scales: List[float] - optimize_am_lm_scale: bool - feature_flow_key: str - search_parameters: Dict - lm_lookahead: bool - lattice_to_ctm_kwargs: Dict - parallelize_conversion: bool - rtf: int - mem: int - lookahead_options: Optional[Dict] = None - epochs: Optional[List[int]] = None - native_ops: Optional[List[str]] = None - - -class NnForcedAlignArgs(TypedDict): - name: str - target_corpus_keys: List[str] - feature_scorer_corpus_key: str - scorer_model_key: Union[str, List[str], Tuple[str], rasr.FeatureScorer] - epoch: int - base_flow_key: str - tf_flow_key: str - dump_alignment: bool diff --git a/common/setups/rasr/util/nn/__init__.py b/common/setups/rasr/util/nn/__init__.py new file mode 100644 index 000000000..241cef3d3 --- /dev/null +++ b/common/setups/rasr/util/nn/__init__.py @@ -0,0 +1,4 @@ +from .common import * +from .data import * +from .decode import * +from .training import * diff --git a/common/setups/rasr/util/nn/common.py b/common/setups/rasr/util/nn/common.py new file mode 100644 index 000000000..73a573c2c --- /dev/null +++ b/common/setups/rasr/util/nn/common.py @@ -0,0 +1,53 @@ +__all__ = ["HybridArgs", "NnForcedAlignArgs"] + +from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union + +import i6_core.rasr as rasr +import i6_core.returnn as returnn + +from .decode import KeyedRecogArgsType +from .training import ReturnnRasrTrainingArgs, ReturnnTrainingJobArgs + + +class HybridArgs: + def __init__( + self, + returnn_training_configs: Dict[str, returnn.ReturnnConfig], + returnn_recognition_configs: Dict[str, returnn.ReturnnConfig], + training_args: Union[Dict[str, Any], ReturnnRasrTrainingArgs, ReturnnTrainingJobArgs], + recognition_args: KeyedRecogArgsType, + test_recognition_args: Optional[KeyedRecogArgsType] = None, + ): + """ + ################################################## + :param returnn_training_configs + RETURNN config keyed by training corpus. + ################################################## + :param returnn_recognition_configs + If a config is not found here, the corresponding training config is used + ################################################## + :param training_args: + ################################################## + :param recognition_args: + Configuration for recognition on dev corpora. + ################################################## + :param test_recognition_args: + Additional configuration for recognition on test corpora. Merged with recognition_args. + ################################################## + """ + self.returnn_training_configs = returnn_training_configs + self.returnn_recognition_configs = returnn_recognition_configs + self.training_args = training_args + self.recognition_args = recognition_args + self.test_recognition_args = test_recognition_args + + +class NnForcedAlignArgs(TypedDict): + name: str + target_corpus_keys: List[str] + feature_scorer_corpus_key: str + scorer_model_key: Union[str, List[str], Tuple[str], rasr.FeatureScorer] + epoch: int + base_flow_key: str + tf_flow_key: str + dump_alignment: bool diff --git a/common/setups/rasr/util/nn/data.py b/common/setups/rasr/util/nn/data.py new file mode 100644 index 000000000..f70471195 --- /dev/null +++ b/common/setups/rasr/util/nn/data.py @@ -0,0 +1,615 @@ +__all__ = [ + "RasrDataInput", + "ReturnnRasrTrainingArgs", + "ReturnnRasrDataInput", + "AllophoneLabeling", + "OggZipRasrCacheDataInput", + "OggZipExternRasrDataInput", + "OggZipHdfDataInput", + "HdfDataInput", + "NextGenHdfDataInput", + "ReturnnRawAlignmentHdfTrainingDataInput", + "AllowedReturnnTrainingDataInput", +] + +import copy +from dataclasses import dataclass, asdict +from typing import Any, Dict, List, Optional, Tuple, Type, Union + +from sisyphus import tk +from sisyphus.delayed_ops import DelayedFormat, DelayedBase + +import i6_core.am as am +import i6_core.rasr as rasr +import i6_core.returnn as returnn + +from i6_core.returnn.hdf import BlissToPcmHDFJob, RasrAlignmentDumpHDFJob +from i6_core.util import MultiPath + +RasrCacheTypes = Union[tk.Path, str, MultiPath, rasr.FlagDependentFlowAttribute, rasr.FlowNetwork] + + +@dataclass(frozen=True) +class RasrDataInput: + features: RasrCacheTypes + + +@dataclass(frozen=True) +class ReturnnRasrTrainingArgs: + """ + Options for writing a RASR training config. See `ReturnnRasrTrainingJob`. + Most of them may be disregarded, i.e. the defaults can be left untouched. + + :param partition_epochs: if >1, split the full dataset into multiple sub-epochs + :param num_classes: number of classes + :param disregarded_classes: path to file with list of disregarded classes + :param class_label_file: path to file with class labels + :param buffer_size: buffer size for data loading + :param extra_rasr_config: extra RASR config + :param extra_rasr_post_config: extra RASR post config + :param use_python_control: whether to use python control, usually True + """ + + partition_epochs: Optional[int] = None + num_classes: Optional[int] = None + disregarded_classes: Optional[tk.Path] = None + class_label_file: Optional[tk.Path] = None + buffer_size: int = 200 * 1024 + extra_rasr_config: Optional[rasr.RasrConfig] = None + extra_rasr_post_config: Optional[rasr.RasrConfig] = None + use_python_control: bool = True + + +class ReturnnRasrDataInput: + """ + Holds the data for ReturnnRasrTrainingJob. + """ + + def __init__( + self, + name: str, + crp: Optional[rasr.CommonRasrParameters] = None, + alignments: Optional[RasrCacheTypes] = None, + feature_flow: Optional[Union[rasr.FlowNetwork, Dict[str, rasr.FlowNetwork]]] = None, + features: Optional[Union[RasrCacheTypes, Dict[str, RasrCacheTypes]]] = None, + acoustic_mixtures: Optional[Union[tk.Path, str]] = None, + feature_scorers: Optional[Dict[str, Type[rasr.FeatureScorer]]] = None, + shuffle_data: bool = True, + shuffling_parameters: Optional[Dict[str, Any]] = None, + stm: Optional[tk.Path] = None, + glm: Optional[tk.Path] = None, + returnn_rasr_training_args: Optional[ReturnnRasrTrainingArgs] = None, + **kwargs, + ): + """ + + :param name: name of the data + :param crp: common RASR parameters + :param alignments: RASR cache of an alignment + :param feature_flow: acoustic feature flow network or dict of feature flow networks + :param features: RASR cache of acoustic features + :param acoustic_mixtures: path to a RASR acoustic mixture file (used in System classes, not RETURNN training) + :param feature_scorers: RASR feature scorers + :param shuffle_data: shuffle training segments into bins of similar length. The bins are sorted by length. + :param shuffling_parameters: Dict of additional parameters to set for shuffling, + currently only 'segment_order_sort_by_time_length_chunk_size' is supported + :param stm: stm file for scoring + :param glm: glm file for scoring + :param returnn_rasr_training_args: arguments for RETURNN training with RASR + """ + self.name = name + self.crp = crp + self.alignments = alignments + self.feature_flow = feature_flow + self.features = features + self.acoustic_mixtures = acoustic_mixtures + self.feature_scorers = feature_scorers + self.shuffle_data = shuffle_data + self.shuffling_parameters = shuffling_parameters + if shuffle_data and self.shuffling_parameters is None: + # apply the legacy defaults if shuffling_parameters is not set + self.shuffling_parameters = {"segment_order_sort_by_time_length_chunk_size": 384} + self.stm = stm + self.glm = glm + self.returnn_rasr_training_args = returnn_rasr_training_args or ReturnnRasrTrainingArgs() + + def get_training_feature_flow_file(self) -> tk.Path: + """Returns the feature flow file for the RETURNN training with RASR.""" + feature_flow = returnn.ReturnnRasrTrainingJob.create_flow(self.feature_flow, self.alignments) + write_feature_flow = rasr.WriteFlowNetworkJob(feature_flow) + return write_feature_flow.out_flow_file + + def get_training_rasr_config_file(self) -> tk.Path: + """Returns the RASR config file for the RETURNN training with RASR.""" + config, post_config = returnn.ReturnnRasrTrainingJob.create_config( + self.crp, self.alignments, **asdict(self.returnn_rasr_training_args) + ) + config.neural_network_trainer.feature_extraction.file = self.get_training_feature_flow_file() + write_rasr_config = rasr.WriteRasrConfigJob(config, post_config) + return write_rasr_config.out_config + + def get_data_dict(self) -> Dict[str, Union[str, DelayedFormat, tk.Path]]: + """Returns the data dict for the ExternSprintDataset to be used in a training ReturnnConfig.""" + config_file = self.get_training_rasr_config_file() + config_str = DelayedFormat("--config={} --*.LOGFILE=nn-trainer.{}.log --*.TASK=1", config_file, self.name) + dataset = { + "class": "ExternSprintDataset", + "sprintTrainerExecPath": rasr.RasrCommand.select_exe(self.crp.nn_trainer_exe, "nn-trainer"), + "sprintConfigStr": config_str, + } + partition_epochs = self.returnn_rasr_training_args.partition_epochs + if partition_epochs is not None: + dataset["partitionEpoch"] = partition_epochs + return dataset + + def build_crp( + self, + am_args, + corpus_object, + concurrent, + segment_path, + lexicon_args, + cart_tree_path=None, + allophone_file=None, + lm_args=None, + ): + """ + constructs and returns a CommonRasrParameters from the given settings and files + """ + crp = rasr.CommonRasrParameters() + rasr.crp_add_default_output(crp) + crp.acoustic_model_config = am.acoustic_model_config(**am_args) + rasr.crp_set_corpus(crp, corpus_object) + crp.concurrent = concurrent + crp.segment_path = segment_path + + crp.lexicon_config = rasr.RasrConfig() + crp.lexicon_config.file = lexicon_args["filename"] + crp.lexicon_config.normalize_pronunciation = lexicon_args["normalize_pronunciation"] + + if "add_from_lexicon" in lexicon_args: + crp.acoustic_model_config.allophones.add_from_lexicon = lexicon_args["add_from_lexicon"] + if "add_all" in lexicon_args: + crp.acoustic_model_config.allophones.add_all = lexicon_args["add_all"] + + if cart_tree_path is not None: + crp.acoustic_model_config.state_tying.type = "cart" + crp.acoustic_model_config.state_tying.file = cart_tree_path + + if lm_args is not None: + crp.language_model_config = rasr.RasrConfig() + crp.language_model_config.type = lm_args["type"] + crp.language_model_config.file = lm_args["filename"] + crp.language_model_config.scale = lm_args["scale"] + + if allophone_file is not None: + crp.acoustic_model_config.allophones.add_from_file = allophone_file + + self.crp = crp + + def update_crp_with_shuffle_parameters(self): + if self.shuffle_data: + self.crp.corpus_config.segment_order_shuffle = True + if self.shuffling_parameters is not None: + if "segment_order_sort_by_time_length_chunk_size" in self.shuffling_parameters: + self.crp.corpus_config.segment_order_sort_by_time_length = True + self.crp.corpus_config.segment_order_sort_by_time_length_chunk_size = self.shuffling_parameters[ + "segment_order_sort_by_time_length_chunk_size" + ] + + def update_crp_with( + self, + *, + corpus_file: Optional[tk.Path] = None, + audio_dir: Optional[Union[str, tk.Path]] = None, + corpus_duration: Optional[int] = None, + segment_path: Optional[Union[str, tk.Path]] = None, + concurrent: Optional[int] = None, + shuffle_data: Optional[bool] = None, + shuffling_parameters: Optional[Dict[str, Any]] = None, + ): + if corpus_file is not None: + self.crp.corpus_config.file = corpus_file + if audio_dir is not None: + self.crp.corpus_config.audio_dir = audio_dir + if corpus_duration is not None: + self.crp.corpus_duration = corpus_duration + if segment_path is not None: + self.crp.segment_path = segment_path + if concurrent is not None: + self.crp.concurrent = concurrent + if shuffle_data is not None: + self.shuffle_data = shuffle_data + if shuffling_parameters is not None: + assert self.shuffle_data, "You need to set shuffle_data to true when using shuffling_parameters" + self.shuffling_parameters = shuffling_parameters + self.update_crp_with_shuffle_parameters() + + def get_crp(self, **kwargs) -> rasr.CommonRasrParameters: + """ + constructs and returns a CommonRasrParameters from the given settings and files + :rtype CommonRasrParameters: + """ + if self.crp is None: + self.build_crp(**kwargs) + + if self.shuffle_data: + self.update_crp_with_shuffle_parameters() + + return self.crp + + +@dataclass() +class AllophoneLabeling: + silence_phone: str + allophone_file: Union[tk.Path, DelayedBase] + phoneme_file: Optional[Union[tk.Path, DelayedBase]] = None + state_tying_file: Optional[Union[tk.Path, DelayedBase]] = None + + +class OggZipRasrCacheDataInput: + def __init__( + self, + oggzip_files: List[tk.Path], + audio: Dict, + alignment_file: tk.Path, + allophone_labeling: AllophoneLabeling, + partition_epoch: int = 1, + seq_ordering: str = "laplace:.1000", + *, + meta_args: Optional[Dict[str, Any]] = None, + ogg_args: Optional[Dict[str, Any]] = None, + rasr_args: Optional[Dict[str, Any]] = None, + acoustic_mixtures: Optional[tk.Path] = None, + ): + """ + :param oggzip_files: zipped ogg files which contain the audio + :param audio: e.g. {"features": "raw", "sample_rate": 16000} for raw waveform input with a sample rate of 16 kHz + :param alignment_file: hdf files which contain dumped RASR alignments + :param allophone_labeling: labels for the RASR alignments + :param partition_epoch: if >1, split the full dataset into multiple sub-epochs + :param seq_ordering: sort the sequences in the dataset, e.g. "random" or "laplace:.100" + :param meta_args: parameters for the `MetaDataset` + :param ogg_args: parameters for the `OggZipDataset` + :param rasr_args: parameters for the `SprintCacheDataset` + :param acoustic_mixtures: path to a RASR acoustic mixture file (used in System classes, not RETURNN training) + """ + self.oggzip_files = oggzip_files + self.audio = audio + self.alignment_file = alignment_file + self.allophone_labeling = allophone_labeling + self.partition_epoch = partition_epoch + self.seq_ordering = seq_ordering + self.meta_args = meta_args + self.ogg_args = ogg_args + self.rasr_args = rasr_args + self.acoustic_mixtures = acoustic_mixtures + + def get_data_dict(self): + return { + "class": "MetaDataset", + "data_map": {"classes": ("rasr", "classes"), "data": ("ogg", "data")}, + "datasets": { + "rasr": { + "class": "SprintCacheDataset", + "data": { + "classes": { + "filename": self.alignment_file, + "data_type": "align", + "allophone_labeling": asdict(self.allophone_labeling), + }, + }, + "use_cache_manager": True, + **(self.rasr_args or {}), + }, + "ogg": { + "class": "OggZipDataset", + "audio": self.audio, + "path": self.oggzip_files, + "use_cache_manager": True, + **(self.ogg_args or {}), + }, + }, + "partition_epoch": self.partition_epoch, + "seq_ordering": self.seq_ordering, + **(self.meta_args or {}), + } + + +class OggZipExternRasrDataInput: + def __init__( + self, + oggzip_files: List[tk.Path], + audio: Dict, + alignment_file: tk.Path, + rasr_exe: tk.Path, + rasr_config_str: str, + partition_epoch: int = 1, + seq_ordering: str = "laplace:.1000", + reduce_target_factor: int = 1, + *, + meta_args: Optional[Dict[str, Any]] = None, + ogg_args: Optional[Dict[str, Any]] = None, + rasr_args: Optional[Dict[str, Any]] = None, + acoustic_mixtures: Optional[tk.Path] = None, + ): + """ + :param oggzip_files: zipped ogg files which contain the audio + :param audio: e.g. {"features": "raw", "sample_rate": 16000} for raw waveform input with a sample rate of 16 kHz + :param alignment_file: hdf files which contain dumped RASR alignments + :param rasr_exe: path to RASR NN trainer executable + :param rasr_config_str: str of rasr parameters + :param partition_epoch: if >1, split the full dataset into multiple sub-epochs + :param seq_ordering: sort the sequences in the dataset, e.g. "random" or "laplace:.100" + :param reduce_target_factor: reduce the alignment by a factor + :param meta_args: parameters for the `MetaDataset` + :param ogg_args: parameters for the `OggZipDataset` + :param rasr_args: parameters for the `SprintCacheDataset` + :param acoustic_mixtures: path to a RASR acoustic mixture file (used in System classes, not RETURNN training) + """ + self.oggzip_files = oggzip_files + self.audio = audio + self.alignment_file = alignment_file + self.rasr_exe = rasr_exe + self.rasr_config_str = rasr_config_str + self.partition_epoch = partition_epoch + self.seq_ordering = seq_ordering + self.reduce_target_factor = reduce_target_factor + self.meta_args = meta_args + self.ogg_args = ogg_args + self.rasr_args = rasr_args + self.acoustic_mixtures = acoustic_mixtures + + def get_data_dict(self): + return { + "class": "MetaDataset", + "data_map": {"classes": ("rasr", "classes"), "data": ("ogg", "data")}, + "datasets": { + "rasr": { + "class": "SprintCacheDataset", + "sprintConfigSts": self.rasr_config_str, + "sprintTrainerExecPath": self.rasr_exe, + "partition_epoch": self.partition_epoch, + "suppress_load_seqs_print": True, + "reduce_target_factor": self.reduce_target_factor, + **(self.rasr_args or {}), + }, + "ogg": { + "class": "OggZipDataset", + "audio": self.audio, + "path": self.oggzip_files, + "use_cache_manager": True, + **(self.ogg_args or {}), + }, + }, + "seq_order_control_dataset": "rasr", + **(self.meta_args or {}), + } + + +class OggZipHdfDataInput: + def __init__( + self, + oggzip_files: List[tk.Path], + alignments: List[tk.Path], + audio: Dict, + partition_epoch: int = 1, + seq_ordering: str = "laplace:.1000", + meta_args: Optional[Dict[str, Any]] = None, + ogg_args: Optional[Dict[str, Any]] = None, + hdf_args: Optional[Dict[str, Any]] = None, + acoustic_mixtures: Optional[tk.Path] = None, + ): + """ + :param oggzip_files: zipped ogg files which contain the audio + :param alignments: hdf files which contain dumped RASR alignments + :param audio: e.g. {"features": "raw", "sample_rate": 16000} for raw waveform input with a sample rate of 16 kHz + :param partition_epoch: if >1, split the full dataset into multiple sub-epochs + :param seq_ordering: sort the sequences in the dataset, e.g. "random" or "laplace:.100" + :param meta_args: parameters for the `MetaDataset` + :param ogg_args: parameters for the `OggZipDataset` + :param hdf_args: parameters for the `HdfDataset` + :param acoustic_mixtures: path to a RASR acoustic mixture file (used in System classes, not RETURNN training) + """ + self.oggzip_files = oggzip_files + self.alignments = alignments + self.audio = audio + self.partition_epoch = partition_epoch + self.seq_ordering = seq_ordering + self.meta_args = meta_args + self.ogg_args = ogg_args + self.hdf_args = hdf_args + self.acoustic_mixtures = acoustic_mixtures + + def get_data_dict(self): + return { + "class": "MetaDataset", + "data_map": {"classes": ("hdf", "classes"), "data": ("ogg", "data")}, + "datasets": { + "hdf": { + "class": "HDFDataset", + "files": self.alignments, + "use_cache_manager": True, + **(self.hdf_args or {}), + }, + "ogg": { + "class": "OggZipDataset", + "audio": self.audio, + "partition_epoch": self.partition_epoch, + "path": self.oggzip_files, + "seq_ordering": self.seq_ordering, + "use_cache_manager": True, + **(self.ogg_args or {}), + }, + }, + "seq_order_control_dataset": "ogg", + **(self.meta_args or {}), + } + + +class HdfDataInput: + def __init__( + self, + features: Union[tk.Path, List[tk.Path]], + alignments: Union[tk.Path, List[tk.Path]], + partition_epoch: int = 1, + seq_ordering: str = "laplace:.1000", + *, + meta_args: Optional[Dict[str, Any]] = None, + align_args: Optional[Dict[str, Any]] = None, + feat_args: Optional[Dict[str, Any]] = None, + acoustic_mixtures: Optional[tk.Path] = None, + segment_file: Optional[tk.Path] = None, + ): + """ + :param features: hdf files which contain raw wve form or features, like GT or MFCC + :param alignments: hdf files which contain dumped RASR alignments + :param partition_epoch: if >1, split the full dataset into multiple sub-epochs + :param seq_ordering: sort the sequences in the dataset, e.g. "random" or "laplace:.100" + :param meta_args: parameters for the `MetaDataset` + :param align_args: parameters for the `HDFDataset` for the alignments + :param feat_args: parameters for the `HDFDataset` for the features + :param acoustic_mixtures: path to a RASR acoustic mixture file (used in System classes, not RETURNN training) + :param segment_file: path to the segment file which defines which segments from corpus to use + """ + self.features = features + self.alignments = alignments + self.partition_epoch = partition_epoch + self.seq_ordering = seq_ordering + self.meta_args = meta_args + self.align_args = align_args + self.feat_args = feat_args + self.acoustic_mixtures = acoustic_mixtures + self.segment_file = segment_file + + from returnn_common.datasets import MetaDataset, HDFDataset + + self.align_dataset = HDFDataset( + files=self.alignments, + seq_ordering=self.seq_ordering, + partition_epoch=self.partition_epoch, + segment_file=self.segment_file, + **(self.align_args or {}), + ) + self.feature_dataset = HDFDataset(files=self.features, **(self.feat_args or {})) + self.meta_dataset = MetaDataset( + data_map={"classes": ("align", "data"), "data": ("feat", "data")}, + datasets={"align": self.align_dataset, "feat": self.feature_dataset}, + seq_order_control_dataset="align", + additional_options={**(self.meta_args or {})}, + ) + + def get_data_dict(self): + return self.meta_dataset.as_returnn_opts() + + def get_dataset_object(self): + return self.meta_dataset + + +class NextGenHdfDataInput: + def __init__( + self, + streams: Dict[str, List[tk.Path]], + data_map: Dict[str, Tuple[str, str]], + partition_epoch: int = 1, + seq_ordering: str = "laplace:.1000", + *, + meta_args: Optional[Dict[str, Any]] = None, + stream_args: Optional[Dict[str, Dict[str, Any]]] = None, + acoustic_mixtures: Optional[tk.Path] = None, + ): + """ + :param streams: `NextGenHDFDataset` for different data streams + :param data_map: a data map specifying the connection between the data stored in the HDF and RETURNN. + Key is the RETURNN name, first value is the name in the `datasets` from `MetaDataset`, + second value the name in the HDF. + :param partition_epoch: if >1, split the full dataset into multiple sub-epochs + :param seq_ordering: sort the sequences in the dataset, e.g. "random" or "laplace:.100" + :param meta_args: parameters for the `MetaDataset` + :param stream_args: parameters for the different `NextGenHDFDataset` + :param acoustic_mixtures: path to a RASR acoustic mixture file (used in System classes, not RETURNN training) + """ + self.streams = streams + self.data_map = data_map + self.partition_epoch = partition_epoch + self.seq_ordering = seq_ordering + self.meta_args = meta_args + self.stream_args = stream_args + self.acoustic_mixtures = acoustic_mixtures + + assert sorted(list(streams.keys())) == sorted([x[0] for x in data_map.values()]) + + def get_data_dict(self): + d = { + "class": "MetaDataset", + "data_map": {}, + "datasets": {}, + "partition_epoch": self.partition_epoch, + "seq_ordering": self.seq_ordering, + **(self.meta_args or {}), + } + for k, v in self.data_map.items(): + d["data_map"][k] = v + + for k, v in self.streams.items(): + d["datasets"][k] = { + "class": "NextGenHDFDataset", + "files": v, + "use_cache_manager": True, + } + if self.stream_args is not None: + d["datasets"][k].update(**self.stream_args[k] or {}) + + return d + + +@dataclass() +class ReturnnRawAlignmentHdfTrainingDataInput: + bliss_corpus: tk.Path + alignment_caches: List[tk.Path] + state_tying_file: tk.Path + allophone_file: tk.Path + returnn_root: tk.Path + seq_ordering: str + + def get_data_dict(self): + raw_hdf_path = BlissToPcmHDFJob( + bliss_corpus=self.bliss_corpus, + returnn_root=self.returnn_root, + ).out_hdf + alignment_hdf_path = RasrAlignmentDumpHDFJob( + alignment_caches=self.alignment_caches, + allophone_file=self.allophone_file, + state_tying_file=self.state_tying_file, + returnn_root=self.returnn_root, + ).out_hdf_files + + data = { + "class": "MetaDataset", + "data_map": {"classes": ("alignments", "data"), "data": ("features", "data")}, + "datasets": { + "alignments": { + "class": "HDFDataset", + "files": alignment_hdf_path, + "seq_ordering": self.seq_ordering, + }, + "features": { + "class": "HDFDataset", + "files": [raw_hdf_path], + }, + }, + "seq_order_control_dataset": "alignments", + } + + return data + + +AllowedReturnnTrainingDataInput = Union[ + Dict, + OggZipRasrCacheDataInput, + OggZipExternRasrDataInput, + OggZipHdfDataInput, + NextGenHdfDataInput, + ReturnnRawAlignmentHdfTrainingDataInput, + HdfDataInput, +] diff --git a/common/setups/rasr/util/nn/decode.py b/common/setups/rasr/util/nn/decode.py new file mode 100644 index 000000000..e70c5256c --- /dev/null +++ b/common/setups/rasr/util/nn/decode.py @@ -0,0 +1,88 @@ +__all__ = ["SearchParameters", "LookaheadOptions", "LatticeToCtmArgs", "NnRecogArgs", "KeyedRecogArgsType"] + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, TypedDict, Union + +from sisyphus import tk + +import i6_core.returnn as returnn + +# Attribute names are invalid identifiers, therefore use old syntax +SearchParameters = TypedDict( + "SearchParameters", + { + "beam-pruning": float, + "beam-pruning-limit": float, + "lm-state-pruning": Optional[float], + "word-end-pruning": float, + "word-end-pruning-limit": float, + }, +) + + +class LookaheadOptions(TypedDict): + cache_high: Optional[int] + cache_low: Optional[int] + history_limit: Optional[int] + laziness: Optional[int] + minimum_representation: Optional[int] + tree_cutoff: Optional[int] + + +class LatticeToCtmArgs(TypedDict): + best_path_algo: Optional[str] + encoding: Optional[str] + extra_config: Optional[Any] + extra_post_config: Optional[Any] + fill_empty_segments: Optional[bool] + + +class NnRecogArgs(TypedDict): + acoustic_mixture_path: Optional[tk.Path] + checkpoints: Optional[Dict[int, returnn.Checkpoint]] + create_lattice: Optional[bool] + epochs: Optional[List[int]] + eval_best_in_lattice: Optional[bool] + eval_single_best: Optional[bool] + feature_flow_key: str + lattice_to_ctm_kwargs: Optional[LatticeToCtmArgs] + lm_lookahead: bool + lm_scales: List[float] + lookahead_options: Optional[LookaheadOptions] + mem: int + name: str + optimize_am_lm_scale: bool + parallelize_conversion: Optional[bool] + prior_scales: List[float] + pronunciation_scales: List[float] + returnn_config: Optional[returnn.ReturnnConfig] + rtf: int + search_parameters: Optional[SearchParameters] + use_gpu: Optional[bool] + + +@dataclass() +class NnRecogArgs: + name: str + returnn_config: returnn.ReturnnConfig + checkpoints: Dict[int, returnn.Checkpoint] + acoustic_mixture_path: tk.Path + prior_scales: List[float] + pronunciation_scales: List[float] + lm_scales: List[float] + optimize_am_lm_scale: bool + feature_flow_key: str + search_parameters: Dict + lm_lookahead: bool + lattice_to_ctm_kwargs: Dict + parallelize_conversion: bool + rtf: int + mem: int + lookahead_options: Optional[Dict] = None + epochs: Optional[List[int]] = None + native_ops: Optional[List[str]] = None + + +# TODO merge the two NnRecogArgs + +KeyedRecogArgsType = Dict[str, Union[Dict[str, Any], NnRecogArgs]] diff --git a/common/setups/rasr/util/nn/training.py b/common/setups/rasr/util/nn/training.py new file mode 100644 index 000000000..13a3fcf82 --- /dev/null +++ b/common/setups/rasr/util/nn/training.py @@ -0,0 +1,52 @@ +__all__ = ["ReturnnTrainingJobArgs", "EpochPartitioning", "ReturnnRasrTrainingArgs", "NnTrainingArgs"] + +from dataclasses import dataclass, field +from typing import Any, List, Optional, Set, TypedDict, Union + +from sisyphus import tk + +import i6_core.rasr as rasr + + +@dataclass() +class ReturnnTrainingJobArgs: + num_epochs: int + log_verbosity: int = field(default=4) + device: str = field(default="gpu") + save_interval: int = field(default=1) + keep_epochs: Optional[Union[List[int], Set[int]]] = None + time_rqmt: float = field(default=168) + mem_rqmt: float = field(default=14) + cpu_rqmt: int = field(default=4) + horovod_num_processes: Optional[int] = None + multi_node_slots: Optional[int] = None + returnn_python_exe: Optional[tk.Path] = None + returnn_root: Optional[tk.Path] = None + + +class EpochPartitioning(TypedDict): + dev: int + train: int + + +class ReturnnRasrTrainingArgs(TypedDict): + buffer_size: Optional[int] + class_label_file: Optional[tk.Path] + cpu_rqmt: Optional[int] + device: Optional[str] + disregarded_classes: Optional[Any] + extra_rasr_config: Optional[rasr.RasrConfig] + extra_rasr_post_config: Optional[rasr.RasrConfig] + horovod_num_processes: Optional[int] + keep_epochs: Optional[bool] + log_verbosity: Optional[int] + mem_rqmt: Optional[int] + num_classes: int + num_epochs: int + partition_epochs: Optional[EpochPartitioning] + save_interval: Optional[int] + time_rqmt: Optional[int] + use_python_control: Optional[bool] + + +NnTrainingArgs = ReturnnRasrTrainingArgs # Legacy compatibility