diff --git a/docs/source/core_concepts/disk_format.md b/docs/source/core_concepts/disk_format.md index 75f238d2..1bd9f7b4 100644 --- a/docs/source/core_concepts/disk_format.md +++ b/docs/source/core_concepts/disk_format.md @@ -18,8 +18,7 @@ folder │ │ └── sample_yyyyyyyyy │ └── infos.yaml └── problem_definition - ├── problem_infos.yaml - └── split.json (or split.csv for <=0.1.7) + └── problem_infos.yaml ``` - `dataset/samples/`: one directory per {py:class}`~plaid.containers.sample.Sample`. diff --git a/docs/source/core_concepts/feature_identifiers.md b/docs/source/core_concepts/feature_identifiers.md index 02ded2a8..5eda389b 100644 --- a/docs/source/core_concepts/feature_identifiers.md +++ b/docs/source/core_concepts/feature_identifiers.md @@ -121,7 +121,7 @@ Legacy name-based methods (e.g., `add_input_scalars_names`) are deprecated; pref - Always include enough context to disambiguate a feature. For fields/nodes on multiple bases/zones/times, set all relevant keys. - Use {py:meth}`~plaid.containers.sample.Sample.get_all_features_identifiers()` to introspect what identifiers exist in a sample. - Use sets to deduplicate identifiers safely: `set(list_of_identifiers)`. -- When authoring problem definitions on disk, {py:meth}`~plaid.problem_definition.ProblemDefinition._save_to_dir_` persists identifiers under `problem_definition/problem_infos.yaml` (keys `input_features` and `output_features`). +- When authoring problem definitions on disk, {py:meth}`~plaid.problem_definition.ProblemDefinition.save_to_dir` persists identifiers under `problem_definition/problem_infos.yaml` (keys `input_features` and `output_features`). ## See also diff --git a/docs/source/core_concepts/problem_definition.md b/docs/source/core_concepts/problem_definition.md index 2db59b19..3198fe8e 100644 --- a/docs/source/core_concepts/problem_definition.md +++ b/docs/source/core_concepts/problem_definition.md @@ -27,7 +27,10 @@ pb.add_out_feature_identifier(FeatureIdentifier({ splits = {"train": [0, 1, 2], "test": [3, 4]} pb.set_split(splits) -pb._save_to_dir_("problem_definition") +pb.save_to_dir("problem_definition") + +# later +pb2 = ProblemDefinition.load("problem_definition") ``` {py:class}`~plaid.problem_definition.ProblemDefinition` supports filtering helpers to intersect existing inputs/outputs with a candidate list of identifiers. diff --git a/examples/post/bisect_example.py b/examples/post/bisect_example.py index 8ed21b62..c7c4c079 100644 --- a/examples/post/bisect_example.py +++ b/examples/post/bisect_example.py @@ -47,7 +47,7 @@ # Load PLAID datasets and problem metadata objects ref_ds = Dataset(dataset_directory / "dataset_ref") pred_ds = Dataset(dataset_directory / "dataset_near_pred") -problem = ProblemDefinition(dataset_directory / "problem_definition") +problem = ProblemDefinition.load(dataset_directory / "problem_definition") # Get output scalars from reference and prediction dataset ref_out_scalars, pred_out_scalars, out_scalars_names = prepare_datasets( @@ -98,7 +98,7 @@ # Load PLAID datasets and problem metadata objects ref_path = Dataset(dataset_directory / "dataset_ref") pred_path = Dataset(dataset_directory / "dataset_pred") -problem_path = ProblemDefinition(dataset_directory / "problem_definition") +problem_path = ProblemDefinition.load(dataset_directory / "problem_definition") # Using PLAID objects to generate bisect plot on feature_2 plot_bisect(ref_path, pred_path, problem_path, "feature_2", "equal_bisect_plot") @@ -114,7 +114,7 @@ # Mix ref_path = dataset_directory / "dataset_ref" pred_path = dataset_directory / "dataset_near_pred" -problem_path = ProblemDefinition(dataset_directory / "problem_definition") +problem_path = ProblemDefinition.load(dataset_directory / "problem_definition") # Using scalar index and verbose option to generate bisect plot scalar_index = 0 @@ -129,4 +129,4 @@ os.remove("converge_bisect_plot.png") os.remove("differ_bisect_plot.png") -os.remove("equal_bisect_plot.png") \ No newline at end of file +os.remove("equal_bisect_plot.png") diff --git a/examples/post/metrics_example.py b/examples/post/metrics_example.py index 70ea4bd6..f23ee463 100644 --- a/examples/post/metrics_example.py +++ b/examples/post/metrics_example.py @@ -46,7 +46,7 @@ # Load PLAID datasets and problem metadata objects ref_ds = Dataset(dataset_directory / "dataset_ref") pred_ds = Dataset(dataset_directory / "dataset_near_pred") -problem = ProblemDefinition(dataset_directory / "problem_definition") +problem = ProblemDefinition.load(dataset_directory / "problem_definition") # Get output scalars from reference and prediction dataset ref_out_scalars, pred_out_scalars, out_scalars_names = prepare_datasets( @@ -102,7 +102,7 @@ # Load PLAID datasets and problem metadata objects ref_ds = Dataset(dataset_directory / "dataset_ref") pred_ds = Dataset(dataset_directory / "dataset_pred") -problem = ProblemDefinition(dataset_directory / "problem_definition") +problem = ProblemDefinition.load(dataset_directory / "problem_definition") # Pretty print activated with verbose mode metrics = compute_metrics(ref_ds, pred_ds, problem, "second_metrics", verbose=True) @@ -123,4 +123,4 @@ pretty_metrics(dictionary) os.remove("first_metrics.yaml") -os.remove("second_metrics.yaml") \ No newline at end of file +os.remove("second_metrics.yaml") diff --git a/examples/problem_definition_example.py b/examples/problem_definition_example.py index dccd4245..abbb5a5b 100644 --- a/examples/problem_definition_example.py +++ b/examples/problem_definition_example.py @@ -154,7 +154,7 @@ # ### Load a ProblemDefinition from a directory via initialization # %% -problem = ProblemDefinition(pb_def_save_fname) +problem = ProblemDefinition.load(pb_def_save_fname) print(problem) # %% [markdown] @@ -168,6 +168,5 @@ # ### Load from a directory via a Dataset instance # %% -problem = ProblemDefinition() -problem.load(pb_def_save_fname) +problem = ProblemDefinition.load(pb_def_save_fname) print(problem) diff --git a/src/plaid/bridges/huggingface_bridge.py b/src/plaid/bridges/huggingface_bridge.py index bbe8c9e6..bd18914b 100644 --- a/src/plaid/bridges/huggingface_bridge.py +++ b/src/plaid/bridges/huggingface_bridge.py @@ -375,8 +375,7 @@ def load_problem_definition_from_hub( with open(yaml_path, "r", encoding="utf-8") as f: yaml_data = yaml.safe_load(f) - prob_def = ProblemDefinition() - prob_def._initialize_from_problem_infos_dict(yaml_data) + prob_def = ProblemDefinition.model_validate(yaml_data) return prob_def @@ -484,9 +483,7 @@ def load_problem_definition_from_disk( Returns: ProblemDefinition: The loaded problem definition. """ - pb_def = ProblemDefinition() - pb_def._load_from_file_(Path(path) / Path("problem_definitions") / Path(name)) - return pb_def + return ProblemDefinition.load(Path(path) / Path("problem_definitions") / Path(name)) def load_tree_struct_from_disk( @@ -698,19 +695,33 @@ def huggingface_description_to_problem_definition( problem_definition = ProblemDefinition() for func, key in [ (problem_definition.set_task, "task"), + (problem_definition.set_score_function, "score_function"), (problem_definition.set_split, "split"), + ]: + if key in description: + func(description[key]) + + if "input_features" in description: + problem_definition.add_in_features_identifiers(description["input_features"]) + if "output_features" in description: + problem_definition.add_out_features_identifiers(description["output_features"]) + if "constant_features" in description: + problem_definition.add_constant_features_identifiers( + description["constant_features"] + ) + legacy_keys = [ (problem_definition.add_input_scalars_names, "in_scalars_names"), (problem_definition.add_output_scalars_names, "out_scalars_names"), (problem_definition.add_input_fields_names, "in_fields_names"), (problem_definition.add_output_fields_names, "out_fields_names"), + (problem_definition.add_input_timeseries_names, "in_timeseries_names"), + (problem_definition.add_output_timeseries_names, "out_timeseries_names"), (problem_definition.add_input_meshes_names, "in_meshes_names"), (problem_definition.add_output_meshes_names, "out_meshes_names"), - ]: - try: + ] + for func, key in legacy_keys: + if key in description: func(description[key]) - except KeyError: - logger.error(f"Could not retrieve key:'{key}' from description") - pass return problem_definition diff --git a/src/plaid/post/bisect.py b/src/plaid/post/bisect.py index 0832514d..881112af 100644 --- a/src/plaid/post/bisect.py +++ b/src/plaid/post/bisect.py @@ -104,7 +104,7 @@ def plot_bisect( if isinstance(pred_dataset, (str, Path)): pred_dataset: Dataset = Dataset(pred_dataset) if isinstance(problem_def, (str, Path)): - problem_def: ProblemDefinition = ProblemDefinition(problem_def) + problem_def: ProblemDefinition = ProblemDefinition.load(problem_def) # Load the testing_set # testing_set = problem_def.get_split("test") diff --git a/src/plaid/post/metrics.py b/src/plaid/post/metrics.py index 6e3fee7e..fc81906c 100644 --- a/src/plaid/post/metrics.py +++ b/src/plaid/post/metrics.py @@ -148,7 +148,7 @@ def compute_metrics( if isinstance(pred_dataset, (str, Path)): pred_dataset: Dataset = Dataset(pred_dataset) if isinstance(problem, (str, Path)): - problem: ProblemDefinition = ProblemDefinition(problem) + problem: ProblemDefinition = ProblemDefinition.load(problem) ### Get important formated values ### problem_split = problem.get_split() diff --git a/src/plaid/problem_definition.py b/src/plaid/problem_definition.py index a6dc3276..62845cbc 100644 --- a/src/plaid/problem_definition.py +++ b/src/plaid/problem_definition.py @@ -1,4 +1,4 @@ -"""Implementation of the `ProblemDefinition` class.""" +"""Problem definition schema based on Pydantic.""" # -*- coding: utf-8 -*- # @@ -7,18 +7,6 @@ # # -# %% Imports - -import sys - -if sys.version_info >= (3, 11): - from typing import Self -else: # pragma: no cover - from typing import TypeVar - - Self = TypeVar("Self") - -import csv import json import logging from pathlib import Path @@ -26,1813 +14,592 @@ import yaml from packaging.version import Version +from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator import plaid from plaid.constants import AUTHORIZED_SCORE_FUNCTIONS, AUTHORIZED_TASKS from plaid.containers import FeatureIdentifier from plaid.types import IndexType -from plaid.utils.deprecation import deprecated - -# %% Globals logger = logging.getLogger(__name__) -# %% Functions -# %% Classes +def _feature_sort_key(feat: Union[str, FeatureIdentifier]) -> tuple[str, str]: + if isinstance(feat, str): + return ("a_string", feat) + return ("b_feature", feat["type"]) -class ProblemDefinition(object): - """Gathers all necessary informations to define a learning problem.""" +class ProblemDefinition(BaseModel): + """Canonical representation of a learning problem.""" - def __init__( - self, - path: Optional[Union[str, Path]] = None, - directory_path: Optional[Union[str, Path]] = None, - ) -> None: - """Initialize an empty :class:`ProblemDefinition `. - - Use :meth:`add_inputs ` or :meth:`add_output_scalars_names ` to feed the :class:`ProblemDefinition` - - Args: - path (Union[str,Path], optional): The path from which to load PLAID problem definition files. - directory_path (Union[str,Path], optional): Deprecated, use `path` instead. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - - # 1. Create empty instance of ProblemDefinition - problem_definition = ProblemDefinition() - print(problem_definition) - >>> ProblemDefinition() - - # 2. Load problem definition and create ProblemDefinition instance - problem_definition = ProblemDefinition("path_to_plaid_prob_def") - print(problem_definition) - >>> ProblemDefinition(input_scalars_names=['s_1'], output_scalars_names=['s_2'], input_meshes_names=['mesh'], task='regression') - """ - self._name: str = None - self._version: Union[Version] = Version(plaid.__version__) - self._task: str = None - self._score_function: str = None - self.in_features_identifiers: Sequence[Union[str, FeatureIdentifier]] = [] - self.out_features_identifiers: Sequence[Union[str, FeatureIdentifier]] = [] - self.constant_features_identifiers: list[str] = [] - self.in_scalars_names: list[str] = [] - self.out_scalars_names: list[str] = [] - self.in_timeseries_names: list[str] = [] - self.out_timeseries_names: list[str] = [] - self.in_fields_names: list[str] = [] - self.out_fields_names: list[str] = [] - self.in_meshes_names: list[str] = [] - self.out_meshes_names: list[str] = [] - self._split: Optional[dict[str, IndexType]] = None - self._train_split: Optional[dict[str, dict[str, IndexType]]] = None - self._test_split: Optional[dict[str, dict[str, IndexType]]] = None - - if directory_path is not None: - if path is not None: - raise ValueError( - "Arguments `path` and `directory_path` cannot be both set. Use only `path` as `directory_path` is deprecated." - ) - else: - path = directory_path - logger.warning( - "DeprecationWarning: 'directory_path' is deprecated, use 'path' instead." - ) + model_config = ConfigDict( + arbitrary_types_allowed=True, + validate_assignment=True, + extra="forbid", + ) - if path is not None: - path = Path(path) - self._load_from_dir_(path) + name: Optional[str] = None + version: Version = Field(default_factory=lambda: Version(plaid.__version__)) + task: Optional[str] = None + score_function: Optional[str] = None + input_features: list[Union[str, FeatureIdentifier]] = Field(default_factory=list) + output_features: list[Union[str, FeatureIdentifier]] = Field(default_factory=list) + constant_features: list[str] = Field(default_factory=list) + input_scalars: list[str] = Field(default_factory=list) + output_scalars: list[str] = Field(default_factory=list) + input_fields: list[str] = Field(default_factory=list) + output_fields: list[str] = Field(default_factory=list) + input_timeseries: list[str] = Field(default_factory=list) + output_timeseries: list[str] = Field(default_factory=list) + input_meshes: list[str] = Field(default_factory=list) + output_meshes: list[str] = Field(default_factory=list) + split: Optional[dict[str, IndexType]] = None + train_split: Optional[dict[str, dict[str, IndexType]]] = None + test_split: Optional[dict[str, dict[str, IndexType]]] = None + + # Validators / serializers + @field_validator("version", mode="before") + @classmethod + def _coerce_version(cls, value: Optional[Union[str, Version]]) -> Optional[Version]: + if value is None: + return Version(plaid.__version__) + if isinstance(value, Version): + return value + return Version(value) + + @field_validator("task") + @classmethod + def _validate_task(cls, value: Optional[str]) -> Optional[str]: + if value is not None and value not in AUTHORIZED_TASKS: + raise ValueError( + f"{value} not among authorized tasks. Maybe you want to try among: {AUTHORIZED_TASKS}" + ) + return value - # -------------------------------------------------------------------------# - def get_name(self) -> str: - """Get the name. None if not defined. + @field_validator("score_function") + @classmethod + def _validate_score_function(cls, value: Optional[str]) -> Optional[str]: + if value is not None and value not in AUTHORIZED_SCORE_FUNCTIONS: + raise ValueError( + f"{value} not among authorized tasks. Maybe you want to try among: {AUTHORIZED_SCORE_FUNCTIONS}" + ) + return value - Returns: - str: The name, such as "regression_1". - """ - return self._name + @field_validator("input_features", "output_features", mode="before") + @classmethod + def _coerce_features( + cls, value: Optional[Sequence[Union[str, FeatureIdentifier, dict]]] + ) -> list[Union[str, FeatureIdentifier]]: + if value is None: + return [] + coerced: list[Union[str, FeatureIdentifier]] = [] + for item in value: + if isinstance(item, dict): + coerced.append(FeatureIdentifier(**item)) + else: + coerced.append(item) + return coerced + + @field_serializer("version") + def _serialize_version(self, value: Optional[Version]) -> Optional[str]: + return str(value) if value is not None else None + + @field_serializer("input_features", "output_features") + def _serialize_features( + self, value: list[Union[str, FeatureIdentifier]] + ) -> list[Union[str, dict]]: + serialized: list[Union[str, dict]] = [] + for item in value: + if isinstance(item, FeatureIdentifier): + serialized.append(dict(**item)) + else: + serialized.append(item) + return serialized - def set_name(self, name: str) -> None: - """Set the name. + @classmethod + def model_validate(cls, obj, *args, **kwargs): + """Validate and possibly load from file.""" + if isinstance(obj, (str, Path)): + return cls.load(obj) + return super().model_validate(obj, *args, **kwargs) - Args: - name (str): The name, such as "regression_1". - """ - if self._name is not None: - raise ValueError(f"A name is already in self._name: (`{self._name}`)") + def __init__( + self, + path: Optional[Union[str, Path]] = None, + directory_path: Optional[Union[str, Path]] = None, + **data, + ): + """Create a problem definition, optionally loading it from disk.""" + if path is not None and directory_path is not None: + raise ValueError( + "Arguments `path` and `directory_path` cannot be both set. Use only `path`." + ) + load_path = directory_path or path + if load_path is not None: + loaded = self.load(load_path) + super().__init__(**loaded.model_dump()) else: - self._name = name + super().__init__(**data) - # -------------------------------------------------------------------------# - def get_version(self) -> Version: - """Get the version. None if not defined. + # Basic setters/getters ------------------------------------------------- + def get_name(self) -> Optional[str]: + """Return the problem name.""" + return self.name - Returns: - Version: The version, such as "0.1.0". - """ - return self._version + def set_name(self, name: str) -> None: + """Set the problem name once.""" + if self.name is not None: + raise ValueError(f"A name is already set (`{self.name}`)") + self.name = name - # -------------------------------------------------------------------------# - def get_task(self) -> str: - """Get the authorized task. None if not defined. + def get_version(self) -> Version: + """Return the stored version.""" + return self.version - Returns: - str: The authorized task, such as "regression" or "classification". - """ - return self._task + def get_task(self) -> Optional[str]: + """Return the task type.""" + return self.task def set_task(self, task: str) -> None: - """Set the authorized task. - - Args: - task (str): The authorized task to be set, such as "regression" or "classification". - """ - if self._task is not None: - raise ValueError(f"A task is already in self._task: (`{self._task}`)") - elif task in AUTHORIZED_TASKS: - self._task = task - else: + """Set the task, enforcing allowed values and preventing overwrite.""" + if self.task is not None: + raise ValueError(f"A task is already set (`{self.task}`)") + if task not in AUTHORIZED_TASKS: raise TypeError( f"{task} not among authorized tasks. Maybe you want to try among: {AUTHORIZED_TASKS}" ) + self.task = task - # -------------------------------------------------------------------------# - def get_score_function(self) -> str: - """Get the authorized score function. None if not defined. - - Returns: - str: The authorized score function, such as "RRMSE". - """ - return self._score_function + def get_score_function(self) -> Optional[str]: + """Return the score function.""" + return self.score_function def set_score_function(self, score_function: str) -> None: - """Set the authorized score function. - - Args: - score_function (str): The authorized score function, such as "RRMSE". - """ - if self._score_function is not None: + """Set the score function, enforcing allowed values and preventing overwrite.""" + if self.score_function is not None: raise ValueError( - f"A score function is already in self._task: (`{self._score_function}`)" + f"A score function is already set (`{self.score_function}`)" ) - elif score_function in AUTHORIZED_SCORE_FUNCTIONS: - self._score_function = score_function - else: + if score_function not in AUTHORIZED_SCORE_FUNCTIONS: raise TypeError( f"{score_function} not among authorized tasks. Maybe you want to try among: {AUTHORIZED_SCORE_FUNCTIONS}" ) + self.score_function = score_function - # -------------------------------------------------------------------------# - - def get_split( - self, indices_name: Optional[str] = None - ) -> Union[IndexType, dict[str, IndexType]]: - """Get the split indices. This function returns the split indices, either for a specific split with the provided `indices_name` or all split indices if `indices_name` is not specified. - - Args: - indices_name (str, optional): The name of the split for which indices are requested. Defaults to None. - - Raises: - KeyError: If `indices_name` is specified but not found among split names. - - Returns: - Union[IndexType,dict[str,IndexType]]: If `indices_name` is provided, it returns - the indices for that split (IndexType). If `indices_name` is not provided, it - returns a dictionary mapping split names (str) to their respective indices - (IndexType). - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - split_indices = problem.get_split() - print(split_indices) - >>> {'train': [0, 1, 2, ...], 'test': [100, 101, ...]} - - test_indices = problem.get_split('test') - print(test_indices) - >>> [100, 101, ...] - """ - if indices_name is None: - return self._split - else: - assert indices_name in self._split, ( - indices_name + " not among split indices names" - ) - return self._split[indices_name] - - def set_split(self, split: dict[str, IndexType]) -> None: - """Set the split indices. This function allows you to set the split indices by providing a dictionary mapping split names (str) to their respective indices (IndexType). - - Args: - split (dict[str,IndexType]): A dictionary containing split names and their indices. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - new_split = {'train': [0, 1, 2], 'test': [3, 4]} - problem.set_split(new_split) - """ - if self._split is not None: # pragma: no cover - logger.warning("split already exists -> data will be replaced") - self._split = split - - def get_train_split( - self, indices_name: Optional[str] = None - ) -> Union[dict[str, IndexType], dict[str, dict[str, IndexType]]]: - """Get the train split indices for different subsets of the dataset. - - Args: - indices_name (str, optional): The name of the specific train split subset - for which indices are requested. Defaults to None. - - Returns: - Union[dict[str, IndexType], dict[str, dict[str, IndexType]]]: - If indices_name is provided: - - Returns a dictionary mapping split names to their indices for the specified subset. - If indices_name is None: - - Returns the complete train split dictionary containing all subsets and their indices. - - Raises: - AssertionError: If indices_name is provided but not found in the train split. - """ - if indices_name is None: - return self._train_split - else: - assert indices_name in self._train_split, ( - indices_name + " not among split indices names" - ) - return self._train_split[indices_name] - - def set_train_split(self, split: dict[str, dict[str, Optional[IndexType]]]) -> None: - """Set the train split dictionary containing subsets and their indices. - - Args: - split (dict[str, dict[str, IndexType]]): Dictionary mapping train subset names - to their split dictionaries. Each split dictionary maps split names (e.g., 'train', 'val') - to their indices. - - Note: - If a train split already exists, it will be replaced and a warning will be logged. - """ - if self._train_split is not None: # pragma: no cover - logger.warning("split already exists -> data will be replaced") - self._train_split = split - - def get_test_split( - self, indices_name: Optional[str] = None - ) -> Union[dict[str, IndexType], dict[str, dict[str, IndexType]]]: - """Get the test split indices for different subsets of the dataset. - - Args: - indices_name (str, optional): The name of the specific test split subset - for which indices are requested. Defaults to None. - - Returns: - Union[dict[str, IndexType], dict[str, dict[str, IndexType]]]: - If indices_name is provided: - - Returns a dictionary mapping split names to their indices for the specified subset. - If indices_name is None: - - Returns the complete test split dictionary containing all subsets and their indices. - - Raises: - AssertionError: If indices_name is provided but not found in the test split. - """ - if indices_name is None: - return self._test_split - else: - assert indices_name in self._test_split, ( - indices_name + " not among split indices names" - ) - return self._test_split[indices_name] - - def set_test_split(self, split: dict[str, dict[str, Optional[IndexType]]]) -> None: - """Set the test split dictionary containing subsets and their indices. - - Args: - split (dict[str, dict[str, IndexType]]): Dictionary mapping test subset names - to their split dictionaries. Each split dictionary maps split names (e.g., 'test', 'test_ood') - to their indices. - - Note: - If a test split already exists, it will be replaced and a warning will be logged. - """ - if self._test_split is not None: # pragma: no cover - logger.warning("split already exists -> data will be replaced") - self._test_split = split - - # -------------------------------------------------------------------------# - @staticmethod - def _feature_sort_key(feat: Union[str, FeatureIdentifier]) -> tuple[str, str]: - if isinstance(feat, str): - # Strings first, sorted lexicographically - return ("a_string", feat) - else: - assert isinstance(feat, FeatureIdentifier) - # Then FeatureIdentifiers, sorted by their "type" field - return ("b_feature", feat["type"]) - - def get_in_features_identifiers(self) -> Sequence[Union[str, FeatureIdentifier]]: - """Get the input features identifiers of the problem. - - Returns: - Sequence[Union[str, FeatureIdentifier]]: A list of input feature identifiers. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - # [...] - in_features_identifiers = problem.get_in_features_identifiers() - print(in_features_identifiers) - >>> ['omega', 'pressure'] - """ - return self.in_features_identifiers + # Feature helpers ------------------------------------------------------- + def get_in_features_identifiers(self) -> list[Union[str, FeatureIdentifier]]: + """Return input feature identifiers.""" + return list(self.input_features) def add_in_features_identifiers( self, inputs: Sequence[Union[str, FeatureIdentifier]] ) -> None: - """Add input features identifiers to the problem. - - Args: - inputs (Sequence[Union[str, FeatureIdentifier]]): A list of input feature identifiers to add. - - Raises: - ValueError: If some :code:`inputs` are redondant. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - in_features_identifiers = ['omega', 'pressure'] - problem.add_in_features_identifiers(in_features_identifiers) - """ - if not (len(set(inputs)) == len(inputs)): + """Add multiple input feature identifiers, rejecting duplicates.""" + if len(set(inputs)) != len(inputs): raise ValueError("Some inputs have same identifiers") - for input in inputs: - self.add_in_feature_identifier(input) + for inp in inputs: + self.add_in_feature_identifier(inp) def add_in_feature_identifier(self, input: Union[str, FeatureIdentifier]) -> None: - """Add an input feature identifier or identifier to the problem. - - Args: - input (FeatureIdentifier): The identifier or identifier of the input feature to add. - - Raises: - ValueError: If the specified input feature is already in the list of inputs. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - input_identifier = 'pressure' - problem.add_in_feature_identifier(input_identifier) - """ - if input in self.in_features_identifiers: - raise ValueError(f"{input} is already in self.in_features_identifiers") - self.in_features_identifiers.append(input) - self.in_features_identifiers.sort(key=self._feature_sort_key) + """Add a single input feature identifier.""" + if input in self.input_features: + raise ValueError(f"{input} is already in input_features") + self.input_features.append(input) + self.input_features.sort(key=_feature_sort_key) def filter_in_features_identifiers( self, identifiers: Sequence[Union[str, FeatureIdentifier]] - ) -> Sequence[Union[str, FeatureIdentifier]]: - """Filter and get input features features corresponding to a sorted list of identifiers. - - Args: - identifiers (Sequence[Union[str, FeatureIdentifier]]): A list of identifiers for which to retrieve corresponding input features. - - Returns: - Sequence[Union[str, FeatureIdentifier]]: A sorted list of input feature identifiers or categories corresponding to the provided identifiers. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - # [...] - features_identifiers = ['omega', 'pressure', 'temperature'] - input_features = problem.filter_in_features_identifiers(features_identifiers) - print(input_features) - >>> ['omega', 'pressure'] - """ - return sorted(set(identifiers).intersection(self.get_in_features_identifiers())) - - # -------------------------------------------------------------------------# - def get_out_features_identifiers(self) -> Sequence[Union[str, FeatureIdentifier]]: - """Get the output features identifiers of the problem. - - Returns: - Sequence[Union[str, FeatureIdentifier]]: A list of output feature identifiers. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - # [...] - outputs_identifiers = problem.get_out_features_identifiers() - print(outputs_identifiers) - >>> ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - """ - return self.out_features_identifiers + ) -> list[Union[str, FeatureIdentifier]]: + """Return registered input identifiers matching the provided list.""" + return sorted( + set(identifiers).intersection(self.get_in_features_identifiers()), + key=_feature_sort_key, + ) + + def get_out_features_identifiers(self) -> list[Union[str, FeatureIdentifier]]: + """Return output feature identifiers.""" + return list(self.output_features) def add_out_features_identifiers( self, outputs: Sequence[Union[str, FeatureIdentifier]] ) -> None: - """Add output features identifiers to the problem. - - Args: - outputs (Sequence[Union[str, FeatureIdentifier]]): A list of output feature identifiers to add. - - Raises: - ValueError: if some :code:`outputs` are redondant. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - out_features_identifiers = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - problem.add_out_features_identifiers(out_features_identifiers) - """ - if not (len(set(outputs)) == len(outputs)): + """Add multiple output feature identifiers, rejecting duplicates.""" + if len(set(outputs)) != len(outputs): raise ValueError("Some outputs have same identifiers") - for output in outputs: - self.add_out_feature_identifier(output) + for out in outputs: + self.add_out_feature_identifier(out) def add_out_feature_identifier(self, output: Union[str, FeatureIdentifier]) -> None: - """Add an output feature identifier or identifier to the problem. - - Args: - output (FeatureIdentifier): The identifier or identifier of the output feature to add. - - Raises: - ValueError: If the specified output feature is already in the list of outputs. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - out_features_identifiers = 'pressure' - problem.add_out_feature_identifier(out_features_identifiers) - """ - if output in self.out_features_identifiers: - raise ValueError(f"{output} is already in self.out_features_identifiers") - self.out_features_identifiers.append(output) - self.out_features_identifiers.sort(key=self._feature_sort_key) + """Add a single output feature identifier.""" + if output in self.output_features: + raise ValueError(f"{output} is already in output_features") + self.output_features.append(output) + self.output_features.sort(key=_feature_sort_key) def filter_out_features_identifiers( self, identifiers: Sequence[Union[str, FeatureIdentifier]] - ) -> Sequence[Union[str, FeatureIdentifier]]: - """Filter and get output features corresponding to a sorted list of identifiers. - - Args: - identifiers (Sequence[Union[str, FeatureIdentifier]]): A list of identifiers for which to retrieve corresponding output features. - - Returns: - Sequence[Union[str, FeatureIdentifier]]: A sorted list of output feature identifiers or categories corresponding to the provided identifiers. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - # [...] - features_identifiers = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - output_features = problem.filter_out_features_identifiers(features_identifiers) - print(output_features) - >>> ['in_massflow'] - """ + ) -> list[Union[str, FeatureIdentifier]]: + """Return registered output identifiers matching the provided list.""" return sorted( - set(identifiers).intersection(self.get_out_features_identifiers()) + set(identifiers).intersection(self.get_out_features_identifiers()), + key=_feature_sort_key, ) - # -------------------------------------------------------------------------# def get_constant_features_identifiers(self) -> list[str]: - """Get the constant features identifiers of the problem. - - Returns: - list[str]: A list of constant feature identifiers. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - # [...] - constant_features_identifiers = problem.get_constant_features_identifiers() - print(constant_features_identifiers) - >>> ['Global/P', 'Base_2_2/Zone/GridCoordinates'] - """ - return self.constant_features_identifiers - - def add_constant_features_identifiers(self, inputs: list[str]) -> None: - """Add input features identifiers to the problem. - - Args: - inputs (list[str]): A list of constant feature identifiers to add. + """Return constant feature identifiers.""" + return list(self.constant_features) - Raises: - ValueError: If some :code:`inputs` are redondant. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - constant_features_identifiers = ['Global/P', 'Base_2_2/Zone/GridCoordinates'] - problem.add_constant_features_identifiers(constant_features_identifiers) - """ - if not (len(set(inputs)) == len(inputs)): + def add_constant_features_identifiers(self, inputs: Sequence[str]) -> None: + """Add multiple constant feature identifiers, rejecting duplicates.""" + if len(set(inputs)) != len(inputs): raise ValueError("Some inputs have same identifiers") - for input in inputs: - self.add_constant_feature_identifier(input) + for inp in inputs: + self.add_constant_feature_identifier(inp) def add_constant_feature_identifier(self, input: str) -> None: - """Add an constant feature identifier to the problem. - - Args: - input (str): The identifier of the constant feature to add. - - Raises: - ValueError: If the specified input feature is already in the list of constant features. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - constant_identifier = 'Global/P' - problem.add_constant_feature_identifier(constant_identifier) - """ - if input in self.constant_features_identifiers: - raise ValueError(f"{input} is already in self.in_features_identifiers") - self.constant_features_identifiers.append(input) - self.constant_features_identifiers.sort(key=self._feature_sort_key) - - def filter_constant_features_identifiers(self, identifiers: list[str]) -> list[str]: - """Filter and get input features features corresponding to a sorted list of identifiers. - - Args: - identifiers (list[str]): A list of identifiers for which to retrieve corresponding constant features. - - Returns: - list[str]: A sorted list of constant feature identifiers corresponding to the provided identifiers. - - Example: - .. code-block:: python - - from plaid.problem_definition import ProblemDefinition - problem = ProblemDefinition() - # [...] - features_identifiers = ['Global/P', 'Base_2_2/Zone/GridCoordinates'] - constant_features = problem.filter_constant_features_identifiers(features_identifiers) - print(constant_features) - >>> ['Global/P'] - """ + """Add a single constant feature identifier.""" + if input in self.constant_features: + raise ValueError(f"{input} is already in constant_features") + self.constant_features.append(input) + self.constant_features.sort() + + def filter_constant_features_identifiers( + self, identifiers: Sequence[str] + ) -> list[str]: + """Return registered constant identifiers matching the provided list.""" return sorted( set(identifiers).intersection(self.get_constant_features_identifiers()) ) - # -------------------------------------------------------------------------# - @deprecated( - "use `get_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) + # Legacy name-based helpers -------------------------------------------- def get_input_scalars_names(self) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.get_in_features_identifiers` instead. - - Get the input scalars names of the problem. - - Returns: - list[str]: A list of input feature names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - input_scalars_names = problem.get_input_scalars_names() - print(input_scalars_names) - >>> ['omega', 'pressure'] - """ - return self.in_scalars_names - - @deprecated( - "use `add_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def add_input_scalars_names(self, inputs: list[str]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_in_features_identifiers` instead. - - Add input scalars names to the problem. - - Args: - inputs (list[str]): A list of input feature names to add. + """Return input scalar names (legacy).""" + return list(self.input_scalars) - Raises: - ValueError: If some :code:`inputs` are redondant. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - input_scalars_names = ['omega', 'pressure'] - problem.add_input_scalars_names(input_scalars_names) - """ - if not (len(set(inputs)) == len(inputs)): + def add_input_scalars_names(self, inputs: Sequence[str]) -> None: + """Add input scalar names (legacy).""" + if len(set(inputs)) != len(inputs): raise ValueError("Some inputs have same names") - for input in inputs: - self.add_input_scalar_name(input) + for inp in inputs: + self.add_input_scalar_name(inp) - @deprecated( - "use `add_in_feature_identifier` instead", version="0.1.8", removal="0.2.0" - ) def add_input_scalar_name(self, input: str) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_in_feature_identifier` instead. - - Add an input scalar name to the problem. - - Args: - input (str): The name of the input feature to add. - - Raises: - ValueError: If the specified input feature is already in the list of inputs. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - input_name = 'pressure' - problem.add_input_scalar_name(input_name) - """ - if input in self.in_scalars_names: - raise ValueError(f"{input} is already in self.in_scalars_names") - self.in_scalars_names.append(input) - self.in_scalars_names.sort() - - @deprecated( - "use `filter_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def filter_input_scalars_names(self, names: list[str]) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.filter_in_features_identifiers` instead. - - Filter and get input scalars features corresponding to a list of names. - - Args: - names (list[str]): A list of names for which to retrieve corresponding input features. - - Returns: - list[str]: A sorted list of input feature names or categories corresponding to the provided names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - scalars_names = ['omega', 'pressure', 'temperature'] - input_features = problem.filter_input_scalars_names(scalars_names) - print(input_features) - >>> ['omega', 'pressure'] - """ + """Add a single input scalar name (legacy).""" + if input in self.input_scalars: + raise ValueError(f"{input} is already in input_scalars") + self.input_scalars.append(input) + self.input_scalars.sort() + + def filter_input_scalars_names(self, names: Sequence[str]) -> list[str]: + """Filter input scalar names (legacy).""" return sorted(set(names).intersection(self.get_input_scalars_names())) - # -------------------------------------------------------------------------# - @deprecated( - "use `get_out_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) def get_output_scalars_names(self) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.get_out_features_identifiers` instead. - - Get the output scalars names of the problem. - - Returns: - list[str]: A list of output feature names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - outputs_names = problem.get_output_scalars_names() - print(outputs_names) - >>> ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - """ - return self.out_scalars_names - - @deprecated( - "use `add_out_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def add_output_scalars_names(self, outputs: list[str]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_out_features_identifiers` instead. - - Add output scalars names to the problem. - - Args: - outputs (list[str]): A list of output feature names to add. + """Return output scalar names (legacy).""" + return list(self.output_scalars) - Raises: - ValueError: if some :code:`outputs` are redondant. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - output_scalars_names = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - problem.add_output_scalars_names(output_scalars_names) - """ - if not (len(set(outputs)) == len(outputs)): + def add_output_scalars_names(self, outputs: Sequence[str]) -> None: + """Add output scalar names (legacy).""" + if len(set(outputs)) != len(outputs): raise ValueError("Some outputs have same names") - for output in outputs: - self.add_output_scalar_name(output) + for out in outputs: + self.add_output_scalar_name(out) - @deprecated( - "use `add_out_feature_identifier` instead", version="0.1.8", removal="0.2.0" - ) def add_output_scalar_name(self, output: str) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_out_feature_identifier` instead. - - Add an output scalar name to the problem. - - Args: - output (str): The name of the output feature to add. - - Raises: - ValueError: If the specified output feature is already in the list of outputs. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - output_scalars_names = 'pressure' - problem.add_output_scalar_name(output_scalars_names) - """ - if output in self.out_scalars_names: - raise ValueError(f"{output} is already in self.out_scalars_names") - self.out_scalars_names.append(output) - self.in_scalars_names.sort() - - def filter_output_scalars_names(self, names: list[str]) -> list[str]: - """Filter and get output features corresponding to a list of names. - - Args: - names (list[str]): A list of names for which to retrieve corresponding output features. - - Returns: - list[str]: A sorted list of output feature names or categories corresponding to the provided names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - scalars_names = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - output_features = problem.filter_output_scalars_names(scalars_names) - print(output_features) - >>> ['in_massflow'] - """ + """Add a single output scalar name (legacy).""" + if output in self.output_scalars: + raise ValueError(f"{output} is already in output_scalars") + self.output_scalars.append(output) + self.output_scalars.sort() + + def filter_output_scalars_names(self, names: Sequence[str]) -> list[str]: + """Filter output scalar names (legacy).""" return sorted(set(names).intersection(self.get_output_scalars_names())) - # -------------------------------------------------------------------------# - @deprecated( - "use `get_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) def get_input_fields_names(self) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.get_in_features_identifiers` instead. + """Return input field names (legacy).""" + return list(self.input_fields) - Get the input fields names of the problem. - - Returns: - list[str]: A list of input feature names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - input_fields_names = problem.get_input_fields_names() - print(input_fields_names) - >>> ['omega', 'pressure'] - """ - return self.in_fields_names - - @deprecated( - "use `add_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def add_input_fields_names(self, inputs: list[str]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_in_features_identifiers` instead. - - Add input fields names to the problem. - - Args: - inputs (list[str]): A list of input feature names to add. - - Raises: - ValueError: If some :code:`inputs` are redondant. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - input_fields_names = ['omega', 'pressure'] - problem.add_input_fields_names(input_fields_names) - """ - if not (len(set(inputs)) == len(inputs)): + def add_input_fields_names(self, inputs: Sequence[str]) -> None: + """Add input field names (legacy).""" + if len(set(inputs)) != len(inputs): raise ValueError("Some inputs have same names") - for input in inputs: - self.add_input_field_name(input) + for inp in inputs: + self.add_input_field_name(inp) - @deprecated( - "use `add_in_feature_identifier` instead", version="0.1.8", removal="0.2.0" - ) def add_input_field_name(self, input: str) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_in_feature_identifier` instead. - - Add an input field name to the problem. - - Args: - input (str): The name of the input feature to add. - - Raises: - ValueError: If the specified input feature is already in the list of inputs. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - input_name = 'pressure' - problem.add_input_field_name(input_name) - """ - if input in self.in_fields_names: - raise ValueError(f"{input} is already in self.in_fields_names") - self.in_fields_names.append(input) - self.in_fields_names.sort() - - def filter_input_fields_names(self, names: list[str]) -> list[str]: - """Filter and get input fields features corresponding to a list of names. - - Args: - names (list[str]): A list of names for which to retrieve corresponding input features. - - Returns: - list[str]: A sorted list of input feature names or categories corresponding to the provided names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - input_fields_names = ['omega', 'pressure', 'temperature'] - input_features = problem.filter_input_fields_names(input_fields_names) - print(input_features) - >>> ['omega', 'pressure'] - """ + """Add a single input field name (legacy).""" + if input in self.input_fields: + raise ValueError(f"{input} is already in input_fields") + self.input_fields.append(input) + self.input_fields.sort() + + def filter_input_fields_names(self, names: Sequence[str]) -> list[str]: + """Filter input field names (legacy).""" return sorted(set(names).intersection(self.get_input_fields_names())) - # -------------------------------------------------------------------------# - @deprecated( - "use `get_out_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) def get_output_fields_names(self) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.get_out_features_identifiers` instead. - - Get the output fields names of the problem. - - Returns: - list[str]: A list of output feature names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - outputs_names = problem.get_output_fields_names() - print(outputs_names) - >>> ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - """ - return self.out_fields_names + """Return output field names (legacy).""" + return list(self.output_fields) - @deprecated( - "use `add_out_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def add_output_fields_names(self, outputs: list[str]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_out_features_identifiers` instead. - - Add output fields names to the problem. - - Args: - outputs (list[str]): A list of output feature names to add. - - Raises: - ValueError: if some :code:`outputs` are redondant. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - output_fields_names = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - problem.add_output_fields_names(output_fields_names) - """ - if not (len(set(outputs)) == len(outputs)): + def add_output_fields_names(self, outputs: Sequence[str]) -> None: + """Add output field names (legacy).""" + if len(set(outputs)) != len(outputs): raise ValueError("Some outputs have same names") - for output in outputs: - self.add_output_field_name(output) + for out in outputs: + self.add_output_field_name(out) - @deprecated( - "use `add_out_feature_identifier` instead", version="0.1.8", removal="0.2.0" - ) def add_output_field_name(self, output: str) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_out_feature_identifier` instead. - - Add an output field name to the problem. - - Args: - output (str): The name of the output feature to add. - - Raises: - ValueError: If the specified output feature is already in the list of outputs. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - output_fields_names = 'pressure' - problem.add_output_field_name(output_fields_names) - """ - if output in self.out_fields_names: - raise ValueError(f"{output} is already in self.out_fields_names") - self.out_fields_names.append(output) - self.out_fields_names.sort() - - def filter_output_fields_names(self, names: list[str]) -> list[str]: - """Filter and get output features corresponding to a list of names. - - Args: - names (list[str]): A list of names for which to retrieve corresponding output features. - - Returns: - list[str]: A sorted list of output feature names or categories corresponding to the provided names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - output_fields_names = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - output_features = problem.filter_output_fields_names(output_fields_names) - print(output_features) - >>> ['in_massflow'] - """ + """Add a single output field name (legacy).""" + if output in self.output_fields: + raise ValueError(f"{output} is already in output_fields") + self.output_fields.append(output) + self.output_fields.sort() + + def filter_output_fields_names(self, names: Sequence[str]) -> list[str]: + """Filter output field names (legacy).""" return sorted(set(names).intersection(self.get_output_fields_names())) - # -------------------------------------------------------------------------# - @deprecated( - "use `get_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) def get_input_timeseries_names(self) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.get_in_features_identifiers` instead. - - Get the input timeseries names of the problem. - - Returns: - list[str]: A list of input feature names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - input_timeseries_names = problem.get_input_timeseries_names() - print(input_timeseries_names) - >>> ['omega', 'pressure'] - """ - return self.in_timeseries_names + """Return input timeseries names (legacy).""" + return list(self.input_timeseries) - @deprecated( - "use `add_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def add_input_timeseries_names(self, inputs: list[str]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_in_features_identifiers` instead. - - Add input timeseries names to the problem. - - Args: - inputs (list[str]): A list of input feature names to add. - - Raises: - ValueError: If some :code:`inputs` are redondant. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - input_timeseries_names = ['omega', 'pressure'] - problem.add_input_timeseries_names(input_timeseries_names) - """ - if not (len(set(inputs)) == len(inputs)): + def add_input_timeseries_names(self, inputs: Sequence[str]) -> None: + """Add input timeseries names (legacy).""" + if len(set(inputs)) != len(inputs): raise ValueError("Some inputs have same names") - for input in inputs: - self.add_input_timeseries_name(input) + for inp in inputs: + self.add_input_timeseries_name(inp) - @deprecated( - "use `add_in_feature_identifier` instead", version="0.1.8", removal="0.2.0" - ) def add_input_timeseries_name(self, input: str) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_in_feature_identifier` instead. - - Add an input timeseries name to the problem. - - Args: - input (str): The name of the input feature to add. - - Raises: - ValueError: If the specified input feature is already in the list of inputs. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - input_name = 'pressure' - problem.add_input_timeseries_name(input_name) - """ - if input in self.in_timeseries_names: - raise ValueError(f"{input} is already in self.in_timeseries_names") - self.in_timeseries_names.append(input) - self.in_timeseries_names.sort() - - def filter_input_timeseries_names(self, names: list[str]) -> list[str]: - """Filter and get input timeseries features corresponding to a list of names. - - Args: - names (list[str]): A list of names for which to retrieve corresponding input features. - - Returns: - list[str]: A sorted list of input feature names or categories corresponding to the provided names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - input_timeseries_names = ['omega', 'pressure', 'temperature'] - input_features = problem.filter_input_timeseries_names(input_timeseries_names) - print(input_features) - >>> ['omega', 'pressure'] - """ + """Add a single input timeseries name (legacy).""" + if input in self.input_timeseries: + raise ValueError(f"{input} is already in input_timeseries") + self.input_timeseries.append(input) + self.input_timeseries.sort() + + def filter_input_timeseries_names(self, names: Sequence[str]) -> list[str]: + """Filter input timeseries names (legacy).""" return sorted(set(names).intersection(self.get_input_timeseries_names())) - # -------------------------------------------------------------------------# - @deprecated( - "use `get_out_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) def get_output_timeseries_names(self) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.get_out_features_identifiers` instead. - - Get the output timeseries names of the problem. - - Returns: - list[str]: A list of output feature names. - - Example: - .. code-block:: python + """Return output timeseries names (legacy).""" + return list(self.output_timeseries) - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - outputs_names = problem.get_output_timeseries_names() - print(outputs_names) - >>> ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - """ - return self.out_timeseries_names - - @deprecated( - "use `add_out_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def add_output_timeseries_names(self, outputs: list[str]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_out_features_identifiers` instead. - - Add output timeseries names to the problem. - - Args: - outputs (list[str]): A list of output feature names to add. - - Raises: - ValueError: if some :code:`outputs` are redondant. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - output_timeseries_names = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - problem.add_output_timeseries_names(output_timeseries_names) - """ - if not (len(set(outputs)) == len(outputs)): + def add_output_timeseries_names(self, outputs: Sequence[str]) -> None: + """Add output timeseries names (legacy).""" + if len(set(outputs)) != len(outputs): raise ValueError("Some outputs have same names") - for output in outputs: - self.add_output_timeseries_name(output) + for out in outputs: + self.add_output_timeseries_name(out) - @deprecated( - "use `add_out_feature_identifier` instead", version="0.1.8", removal="0.2.0" - ) def add_output_timeseries_name(self, output: str) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_out_feature_identifier` instead. - - Add an output timeseries name to the problem. - - Args: - output (str): The name of the output feature to add. - - Raises: - ValueError: If the specified output feature is already in the list of outputs. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - output_timeseries_names = 'pressure' - problem.add_output_timeseries_name(output_timeseries_names) - """ - if output in self.out_timeseries_names: - raise ValueError(f"{output} is already in self.out_timeseries_names") - self.out_timeseries_names.append(output) - self.in_timeseries_names.sort() - - def filter_output_timeseries_names(self, names: list[str]) -> list[str]: - """Filter and get output features corresponding to a list of names. - - Args: - names (list[str]): A list of names for which to retrieve corresponding output features. - - Returns: - list[str]: A sorted list of output feature names or categories corresponding to the provided names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - output_timeseries_names = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - output_features = problem.filter_output_timeseries_names(output_timeseries_names) - print(output_features) - >>> ['in_massflow'] - """ + """Add a single output timeseries name (legacy).""" + if output in self.output_timeseries: + raise ValueError(f"{output} is already in output_timeseries") + self.output_timeseries.append(output) + self.output_timeseries.sort() + + def filter_output_timeseries_names(self, names: Sequence[str]) -> list[str]: + """Filter output timeseries names (legacy).""" return sorted(set(names).intersection(self.get_output_timeseries_names())) - # -------------------------------------------------------------------------# - @deprecated( - "use `get_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) def get_input_meshes_names(self) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.get_in_features_identifiers` instead. - - Get the input meshes names of the problem. - - Returns: - list[str]: A list of input feature names. - - Example: - .. code-block:: python + """Return input mesh names (legacy).""" + return list(self.input_meshes) - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - input_meshes_names = problem.get_input_meshes_names() - print(input_meshes_names) - >>> ['omega', 'pressure'] - """ - return self.in_meshes_names - - @deprecated( - "use `add_in_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def add_input_meshes_names(self, inputs: list[str]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_in_features_identifiers` instead. - - Add input meshes names to the problem. - - Args: - inputs (list[str]): A list of input feature names to add. - - Raises: - ValueError: If some :code:`inputs` are redondant. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - input_meshes_names = ['omega', 'pressure'] - problem.add_input_meshes_names(input_meshes_names) - """ - if not (len(set(inputs)) == len(inputs)): + def add_input_meshes_names(self, inputs: Sequence[str]) -> None: + """Add input mesh names (legacy).""" + if len(set(inputs)) != len(inputs): raise ValueError("Some inputs have same names") - for input in inputs: - self.add_input_mesh_name(input) + for inp in inputs: + self.add_input_mesh_name(inp) - @deprecated( - "use `add_in_feature_identifier` instead", version="0.1.8", removal="0.2.0" - ) def add_input_mesh_name(self, input: str) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_in_feature_identifier` instead. - - Add an input mesh name to the problem. - - Args: - input (str): The name of the input feature to add. - - Raises: - ValueError: If the specified input feature is already in the list of inputs. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - input_name = 'pressure' - problem.add_input_mesh_name(input_name) - """ - if input in self.in_meshes_names: - raise ValueError(f"{input} is already in self.in_meshes_names") - self.in_meshes_names.append(input) - self.in_meshes_names.sort() - - def filter_input_meshes_names(self, names: list[str]) -> list[str]: - """Filter and get input meshes features corresponding to a list of names. - - Args: - names (list[str]): A list of names for which to retrieve corresponding input features. - - Returns: - list[str]: A sorted list of input feature names or categories corresponding to the provided names. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - input_meshes_names = ['omega', 'pressure', 'temperature'] - input_features = problem.filter_input_meshes_names(input_meshes_names) - print(input_features) - >>> ['omega', 'pressure'] - """ + """Add a single input mesh name (legacy).""" + if input in self.input_meshes: + raise ValueError(f"{input} is already in input_meshes") + self.input_meshes.append(input) + self.input_meshes.sort() + + def filter_input_meshes_names(self, names: Sequence[str]) -> list[str]: + """Filter input mesh names (legacy).""" return sorted(set(names).intersection(self.get_input_meshes_names())) - # -------------------------------------------------------------------------# - @deprecated( - "use `get_out_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) def get_output_meshes_names(self) -> list[str]: - """DEPRECATED: use :meth:`ProblemDefinition.get_out_features_identifiers` instead. - - Get the output meshes names of the problem. - - Returns: - list[str]: A list of output feature names. + """Return output mesh names (legacy).""" + return list(self.output_meshes) - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - outputs_names = problem.get_output_meshes_names() - print(outputs_names) - >>> ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - """ - return self.out_meshes_names - - @deprecated( - "use `add_out_features_identifiers` instead", version="0.1.8", removal="0.2.0" - ) - def add_output_meshes_names(self, outputs: list[str]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_out_features_identifiers` instead. - - Add output meshes names to the problem. - - Args: - outputs (list[str]): A list of output feature names to add. - - Raises: - ValueError: if some :code:`outputs` are redondant. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - output_meshes_names = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - problem.add_output_meshes_names(output_meshes_names) - """ - if not (len(set(outputs)) == len(outputs)): + def add_output_meshes_names(self, outputs: Sequence[str]) -> None: + """Add output mesh names (legacy).""" + if len(set(outputs)) != len(outputs): raise ValueError("Some outputs have same names") - for output in outputs: - self.add_output_mesh_name(output) + for out in outputs: + self.add_output_mesh_name(out) - @deprecated( - "use `add_out_feature_identifier` instead", version="0.1.8", removal="0.2.0" - ) def add_output_mesh_name(self, output: str) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.add_out_feature_identifier` instead. - - Add an output mesh name to the problem. - - Args: - output (str): The name of the output feature to add. - - Raises: - ValueError: If the specified output feature is already in the list of outputs. - - Example: - .. code-block:: python + """Add a single output mesh name (legacy).""" + if output in self.output_meshes: + raise ValueError(f"{output} is already in output_meshes") + self.output_meshes.append(output) + self.output_meshes.sort() + + def filter_output_meshes_names(self, names: Sequence[str]) -> list[str]: + """Filter output mesh names (legacy).""" + return sorted(set(names).intersection(self.get_output_meshes_names())) - from plaid import ProblemDefinition - problem = ProblemDefinition() - output_meshes_names = 'pressure' - problem.add_output_mesh_name(output_meshes_names) - """ - if output in self.out_meshes_names: - raise ValueError(f"{output} is already in self.out_meshes_names") - self.out_meshes_names.append(output) - self.in_meshes_names.sort() + # Splits ---------------------------------------------------------------- + def get_split( + self, indices_name: Optional[str] = None + ) -> Union[IndexType, dict[str, IndexType], None]: + """Return the full split or a named subset.""" + if self.split is None: + return None + if indices_name is None: + return self.split + if indices_name not in self.split: + raise KeyError(indices_name + " not among split indices names") + return self.split[indices_name] - def filter_output_meshes_names(self, names: list[str]) -> list[str]: - """Filter and get output features corresponding to a list of names. + def set_split(self, split: dict[str, IndexType]) -> None: + """Set the main split mapping.""" + if self.split is not None: + logger.warning("split already exists -> data will be replaced") + self.split = split - Args: - names (list[str]): A list of names for which to retrieve corresponding output features. + def get_train_split( + self, indices_name: Optional[str] = None + ) -> Union[dict[str, IndexType], dict[str, dict[str, IndexType]], None]: + """Return the train split dictionary or a named subset.""" + if self.train_split is None: + return None + if indices_name is None: + return self.train_split + if indices_name not in self.train_split: + raise KeyError(indices_name + " not among split indices names") + return self.train_split[indices_name] - Returns: - list[str]: A sorted list of output feature names or categories corresponding to the provided names. + def set_train_split(self, split: dict[str, dict[str, IndexType]]) -> None: + """Set the train split mapping.""" + if self.train_split is not None: + logger.warning("train_split already exists -> data will be replaced") + self.train_split = split - Example: - .. code-block:: python + def get_test_split( + self, indices_name: Optional[str] = None + ) -> Union[dict[str, IndexType], dict[str, dict[str, IndexType]], None]: + """Return the test split dictionary or a named subset.""" + if self.test_split is None: + return None + if indices_name is None: + return self.test_split + if indices_name not in self.test_split: + raise KeyError(indices_name + " not among split indices names") + return self.test_split[indices_name] - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - output_meshes_names = ['compression_rate', 'in_massflow', 'isentropic_efficiency'] - output_features = problem.filter_output_meshes_names(output_meshes_names) - print(output_features) - >>> ['in_massflow'] - """ - return sorted(set(names).intersection(self.get_output_meshes_names())) + def set_test_split(self, split: dict[str, dict[str, IndexType]]) -> None: + """Set the test split mapping.""" + if self.test_split is not None: + logger.warning("test_split already exists -> data will be replaced") + self.test_split = split - # -------------------------------------------------------------------------# def get_all_indices(self) -> list[int]: - """Get all indices from splits. - - Returns: - list[int]: list containing all unique indices. - """ - all_indices = [] - for indices in self.get_split().values(): + """Return the set of all indices present in the main split.""" + if self.split is None: + return [] + all_indices: list[int] = [] + for indices in self.split.values(): all_indices += list(indices) return list(set(all_indices)) - # -------------------------------------------------------------------------# - def _generate_problem_infos_dict(self) -> dict[str, Union[str, list]]: - """Generate a dictionary containing all relevant problem definition data. - - Returns: - dict[str, Union[str, list]]: A dictionary with keys for task, input/output features, scalars, fields, timeseries, and meshes. - """ - data = { - "task": self._task, - "score_function": self._score_function, - "constant_features": [], - "input_features": [], - "output_features": [], - } - for tup in self.in_features_identifiers: - if isinstance(tup, FeatureIdentifier): - data["input_features"].append(dict(**tup)) - else: - data["input_features"].append(tup) - for tup in self.out_features_identifiers: - if isinstance(tup, FeatureIdentifier): - data["output_features"].append(dict(**tup)) - else: - data["output_features"].append(tup) - for tup in self.constant_features_identifiers: - data["constant_features"].append(tup) - if self._train_split is not None: - data["train_split"] = self._train_split - if self._test_split is not None: - data["test_split"] = self._test_split - if self._name is not None: - data["name"] = self._name - if Version(plaid.__version__) < Version("0.2.0"): - data.update( - { - k: v - for k, v in { - "input_scalars": self.in_scalars_names, - "output_scalars": self.out_scalars_names, - "input_fields": self.in_fields_names, - "output_fields": self.out_fields_names, - "input_timeseries": self.in_timeseries_names, - "output_timeseries": self.out_timeseries_names, - "input_meshes": self.in_meshes_names, - "output_meshes": self.out_meshes_names, - }.items() - if v # keeps only truthy (non-empty, non-None) lists - } - ) - - # Handle version - plaid_version = Version(plaid.__version__) - if self._version != plaid_version: # pragma: no cover - logger.warning( - f"Version mismatch: ProblemDefinition was loaded from version {self._version if self._version is not None else 'anterior to 0.1.10'}, and will be saved with version: {plaid_version}" - ) - data["version"] = str(plaid_version) - else: - data["version"] = str(self._version) - - return data - - # Handle version - plaid_version = Version(plaid.__version__) - if self._version != plaid_version: # pragma: no cover - logger.warning( - f"Version mismatch: ProblemDefinition was loaded from version {self._version if self._version is not None else 'anterior to 0.1.10'}, and will be saved with version: {plaid_version}" - ) - data["version"] = str(plaid_version) - else: - data["version"] = str(self._version) - - # Save infos - + # Persistence ----------------------------------------------------------- def save_to_file(self, path: Union[str, Path]) -> None: - """Save problem information, inputs, outputs, and split to the specified file in YAML format. - - Args: - path (Union[str,Path]): The filepath where the problem information will be saved. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - problem.save_to_file("/path/to/save_file") - """ - problem_infos_dict = self._generate_problem_infos_dict() - + """Persist the problem definition to a single YAML file.""" path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) - if path.suffix != ".yaml": path = path.with_suffix(".yaml") - - # Save infos with path.open("w") as file: - yaml.dump( - problem_infos_dict, file, default_flow_style=False, sort_keys=True - ) - - @deprecated( - "`ProblemDefinition._save_to_dir_(...)` is deprecated. Use `ProblemDefinition.save_to_dir(...)` instead.", - version="0.1.10", - removal="0.2.0", - ) - def _save_to_dir_(self, path: Union[str, Path]) -> None: - """DEPRECATED: use :meth:`ProblemDefinition.save_to_dir` instead.""" - self.save_to_dir(path) + yaml.dump(self.model_dump(exclude_none=True), file, sort_keys=True) def save_to_dir(self, path: Union[str, Path]) -> None: - """Save problem information, inputs, outputs, and split to the specified directory in YAML and CSV formats. - - Args: - path (Union[str,Path]): The directory where the problem information will be saved. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - problem.save_to_dir("/path/to/save_directory") - """ + """Persist the problem definition to a directory (single YAML).""" path = Path(path) - - if not (path.is_dir()): - path.mkdir(parents=True) - - problem_infos_dict = self._generate_problem_infos_dict() - - # Save infos - pbdef_fname = path / "problem_infos.yaml" - with pbdef_fname.open("w") as file: - yaml.dump( - problem_infos_dict, file, default_flow_style=False, sort_keys=True - ) - - # Save split - split_fname = path / "split.json" - if self.get_split() is not None: - with split_fname.open("w") as file: - json.dump(self.get_split(), file) - - # # Save split - # split_fname = path / "train_split.json" - # if self.get_train_split() is not None: - # with split_fname.open("w") as file: - # json.dump(self.get_train_split(), file) - - # split_fname = path / "test_split.json" - # if self.get_test_split() is not None: - # with split_fname.open("w") as file: - # json.dump(self.get_test_split(), file) + path.mkdir(parents=True, exist_ok=True) + self.save_to_file(path / "problem_infos.yaml") @classmethod - def load(cls, path: Union[str, Path]) -> Self: # pragma: no cover - """Load data from a specified directory. - - Args: - path (Union[str,Path]): The path from which to load files. - - Returns: - Self: The loaded dataset (Dataset). - """ - instance = cls() - instance._load_from_dir_(path) - return instance - - def _initialize_from_problem_infos_dict( - self, data: dict[str, Union[str, list]] - ) -> None: - if "version" not in data: - self._version = None - else: - self._version = Version(data["version"]) - self._task = data["task"] - self.in_features_identifiers = [] - if "input_features" in data: - for tup in data["input_features"]: - if isinstance(tup, dict): - self.in_features_identifiers.append(FeatureIdentifier(**tup)) - else: - self.in_features_identifiers.append(tup) - self.out_features_identifiers = [] - if "output_features" in data: - for tup in data["output_features"]: - if isinstance(tup, dict): - self.out_features_identifiers.append(FeatureIdentifier(**tup)) - else: - self.out_features_identifiers.append(tup) - self.constant_features_identifiers = [] - if "constant_features" in data: - for tup in data["constant_features"]: - self.constant_features_identifiers.append(tup) - if "version" not in data or Version(data["version"]) < Version("0.2.0"): - self.in_scalars_names = data.get("input_scalars", []) - self.out_scalars_names = data.get("output_scalars", []) - self.in_fields_names = data.get("input_fields", []) - self.out_fields_names = data.get("output_fields", []) - self.in_timeseries_names = data.get("input_timeseries", []) - self.out_timeseries_names = data.get("output_timeseries", []) - self.in_meshes_names = data.get("input_meshes", []) - self.out_meshes_names = data.get("output_meshes", []) - else: # pragma: no cover - old_keys = [ - "input_scalars", - "input_fields", - "input_timeseries", - "input_meshes", - "output_scalars", - "output_fields", - "output_timeseries", - "output_meshes", - ] - for k in old_keys: - if k in data: - logger.warning( - f"Key '{k}' is deprecated and will be ignored. You should convert your ProblemDefinition using FeatureIdentifiers to identify features instead of names." - ) - if "score_function" in data: - self._score_function = data["score_function"] - if "train_split" in data: - self._train_split = data["train_split"] - if "test_split" in data: - self._test_split = data["test_split"] - if "name" in data: - self._name = data["name"] - - def _load_from_file_(self, path: Union[str, Path]) -> None: - """Load problem information, inputs, outputs, and split from the specified file in YAML format. - - Args: - path (Union[str,Path]): The filepath from which to load the problem information. - - Raises: - FileNotFoundError: Triggered if the provided file does not exist. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - problem._load_from_file_("/path/to/load_file") - """ + def load(cls, path: Union[str, Path]) -> "ProblemDefinition": + """Load a problem definition from a file or directory.""" path = Path(path) + if path.is_dir(): + return cls._load_from_dir(path) + return cls._load_from_file(path) + @classmethod + def _load_from_file(cls, path: Union[str, Path]) -> "ProblemDefinition": + """Load a problem definition from a YAML file.""" + path = Path(path) if path.suffix != ".yaml": path = path.with_suffix(".yaml") - if not path.exists(): raise FileNotFoundError(f'File "{path}" does not exist. Abort') - with path.open("r") as file: - data = yaml.safe_load(file) - - self._initialize_from_problem_infos_dict(data) - - def _load_from_dir_(self, path: Union[str, Path]) -> None: - """Load problem information, inputs, outputs, and split from the specified directory in YAML and CSV formats. - - Args: - path (Union[str,Path]): The directory from which to load the problem information. + data = yaml.safe_load(file) or {} + return cls.model_validate(data) - Raises: - FileNotFoundError: Triggered if the provided directory or file problem_infos.yaml does not exist - FileExistsError: Triggered if the provided path is a file instead of a directory. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - problem._load_from_dir_("/path/to/load_directory") - """ + @classmethod + def _load_from_dir(cls, path: Union[str, Path]) -> "ProblemDefinition": + """Load a problem definition from a directory layout.""" path = Path(path) - if not path.exists(): raise FileNotFoundError(f'Directory "{path}" does not exist. Abort') - if not path.is_dir(): raise FileExistsError(f'"{path}" is not a directory. Abort') pbdef_fname = path / "problem_infos.yaml" - data = {} # To avoid crash if pbdef_fname does not exist - if pbdef_fname.is_file(): - with pbdef_fname.open("r") as file: - data = yaml.safe_load(file) - else: + if not pbdef_fname.is_file(): raise FileNotFoundError( f"file with path `{pbdef_fname}` does not exist. Abort" ) + with pbdef_fname.open("r") as file: + data = yaml.safe_load(file) or {} + + if "split" not in data: + split_json = path / "split.json" + if split_json.is_file(): + with split_json.open("r") as file: + data["split"] = json.load(file) + else: + split_csv = path / "split.csv" + if split_csv.is_file(): # pragma: no cover + import csv as _csv - self._initialize_from_problem_infos_dict(data) - - # if it was saved with version <=0.1.7 it is a .csv else it is .json - split = {} - split_fname_csv = path / "split.csv" - split_fname_json = path / "split.json" - if split_fname_json.is_file(): - with split_fname_json.open("r") as file: - split = json.load(file) - if split_fname_csv.is_file(): # pragma: no cover - logger.warning( - f"Both files with path `{split_fname_csv}` and `{split_fname_json}` exist. JSON file is the standard from 0.1.7 -> CSV file will be ignored" - ) - elif split_fname_csv.is_file(): # pragma: no cover - with split_fname_csv.open("r") as file: - reader = csv.reader(file, delimiter=",") - for row in reader: - split[row[0]] = [int(i) for i in row[1:]] - else: # pragma: no cover - logger.warning( - f"file with path `{split_fname_csv}` or `{split_fname_json}` does not exist. Splits will not be set" - ) - self.set_split(split) - - def extract_problem_definition_from_identifiers( - self, identifiers: Sequence[Union[str, FeatureIdentifier]] - ) -> Self: - """Create a new ProblemDefinition restricted to a subset of feature identifiers. - - Args: - identifiers (Sequence[Union[str, FeatureIdentifier]]): List of identifiers to keep. - - Returns: - ProblemDefinition: A new :class:`ProblemDefinition` instance. - """ - new_problem_definition = ProblemDefinition() - if self._task is not None: - new_problem_definition.set_task(self.get_task()) - if self._name is not None: - new_problem_definition.set_name(self.get_name()) - - in_features = self.filter_in_features_identifiers(identifiers) - if len(in_features) > 0: - new_problem_definition.add_in_features_identifiers(in_features) - - out_features = self.filter_out_features_identifiers(identifiers) - if len(out_features) > 0: - new_problem_definition.add_out_features_identifiers(out_features) - - if self.get_split() is not None: - new_problem_definition.set_split(self.get_split()) + split: dict[str, list[int]] = {} + with split_csv.open("r") as file: + reader = _csv.reader(file, delimiter=",") + for row in reader: + split[row[0]] = [int(i) for i in row[1:]] + data["split"] = split - return new_problem_definition + return cls.model_validate(data) - # -------------------------------------------------------------------------# + # Representation -------------------------------------------------------- def __repr__(self) -> str: - """Return a string representation of the problem. - - Returns: - str: A string representation of the overview of problem content. - - Example: - .. code-block:: python - - from plaid import ProblemDefinition - problem = ProblemDefinition() - # [...] - print(problem) - >>> ProblemDefinition(input_scalars_names=['s_1'], output_scalars_names=['s_2'], input_meshes_names=['mesh'], task='regression', split_names=['train', 'val']) - """ - str_repr = "ProblemDefinition(" - - # ---# features - if len(self.in_features_identifiers) > 0: - in_features_identifiers = self.in_features_identifiers - str_repr += f"{in_features_identifiers=}, " - if len(self.out_features_identifiers) > 0: - out_features_identifiers = self.out_features_identifiers - str_repr += f"{out_features_identifiers=}, " - - # ---# scalars - if len(self.in_scalars_names) > 0: - input_scalars_names = self.in_scalars_names - str_repr += f"{input_scalars_names=}, " - if len(self.out_scalars_names) > 0: - output_scalars_names = self.out_scalars_names - str_repr += f"{output_scalars_names=}, " - # ---# fields - if len(self.in_fields_names) > 0: - input_fields_names = self.in_fields_names - str_repr += f"{input_fields_names=}, " - if len(self.out_fields_names) > 0: - output_fields_names = self.out_fields_names - str_repr += f"{output_fields_names=}, " - # ---# timeseries - if len(self.in_timeseries_names) > 0: - input_timeseries_names = self.in_timeseries_names - str_repr += f"{input_timeseries_names=}, " - if len(self.out_timeseries_names) > 0: - output_timeseries_names = self.out_timeseries_names - str_repr += f"{output_timeseries_names=}, " - # ---# meshes - if len(self.in_meshes_names) > 0: - input_meshes_names = self.in_meshes_names - str_repr += f"{input_meshes_names=}, " - if len(self.out_meshes_names) > 0: - output_meshes_names = self.out_meshes_names - str_repr += f"{output_meshes_names=}, " - # ---# task - if self._task is not None: - task = self._task - str_repr += f"{task=}, " - # ---# split - if self._split is not None: - split_names = list(self._split.keys()) - str_repr += f"{split_names=}, " - - if str_repr[-2:] == ", ": - str_repr = str_repr[:-2] - str_repr += ")" - return str_repr + """Return a concise string representation of the problem definition.""" + pieces = [] + if self.input_features: + pieces.append(f"input_features={self.input_features}") + if self.output_features: + pieces.append(f"output_features={self.output_features}") + if self.constant_features: + pieces.append(f"constant_features={self.constant_features}") + if self.task: + pieces.append(f"task='{self.task}'") + if self.split: + pieces.append(f"split_names={list(self.split.keys())}") + if self.name: + pieces.append(f"name='{self.name}'") + joined = ", ".join(pieces) + return f"ProblemDefinition({joined})" diff --git a/src/plaid/storage/common/reader.py b/src/plaid/storage/common/reader.py index c8e05f88..a7022831 100644 --- a/src/plaid/storage/common/reader.py +++ b/src/plaid/storage/common/reader.py @@ -61,9 +61,7 @@ def load_problem_definitions_from_disk( pb_defs = [] for p in pb_def_dir.iterdir(): if p.is_file(): - pb_def = ProblemDefinition() - pb_def._load_from_file_(pb_def_dir / Path(p.name)) - pb_defs.append(pb_def) + pb_defs.append(ProblemDefinition.load(p)) return pb_defs else: logger.warning("No problem definitions found on disk.") diff --git a/tests/bridges/test_huggingface_bridge.py b/tests/bridges/test_huggingface_bridge.py index 0fd8f702..67c568f3 100644 --- a/tests/bridges/test_huggingface_bridge.py +++ b/tests/bridges/test_huggingface_bridge.py @@ -37,7 +37,7 @@ def dataset(samples, infos) -> Dataset: def problem_definition() -> ProblemDefinition: problem_definition = ProblemDefinition() problem_definition.set_task("regression") - problem_definition.add_input_scalars_names(["feature_name_1", "feature_name_2"]) + problem_definition.add_in_features_identifiers(["feature_name_1", "feature_name_2"]) problem_definition.set_split({"train": [0, 2], "test": [1, 3]}) return problem_definition diff --git a/tests/problem_definition/problem_infos.yaml b/tests/problem_definition/problem_infos.yaml index 7ee29383..903278e2 100644 --- a/tests/problem_definition/problem_infos.yaml +++ b/tests/problem_definition/problem_infos.yaml @@ -6,6 +6,14 @@ input_features: name: test_feature - type: scalar name: feature +split: + test: + - 3 + - 4 + train: + - 0 + - 1 + - 2 output_features: - type: scalar name: predict_feature @@ -13,35 +21,3 @@ output_features: name: test_feature - type: scalar name: feature -input_scalars: -- predict_scalar -- scalar -- test_scalar -output_scalars: -- scalar -- test_scalar -- predict_scalar -input_fields: -- field -- predict_field -- test_field -output_fields: -- field -- predict_field -- test_field -input_timeseries: -- predict_timeseries -- test_timeseries -- timeseries -output_timeseries: -- timeseries -- test_timeseries -- predict_timeseries -input_meshes: -- mesh -- predict_mesh -- test_mesh -output_meshes: -- mesh -- test_mesh -- predict_mesh diff --git a/tests/problem_definition/split.json b/tests/problem_definition/split.json deleted file mode 100644 index 39cf61aa..00000000 --- a/tests/problem_definition/split.json +++ /dev/null @@ -1 +0,0 @@ -{"train": [0, 1, 2], "test": [3, 4]} \ No newline at end of file diff --git a/tests/storage/test_storage.py b/tests/storage/test_storage.py index 63bfbb91..75a8e4cb 100644 --- a/tests/storage/test_storage.py +++ b/tests/storage/test_storage.py @@ -56,7 +56,7 @@ def main_splits() -> dict: def problem_definition(main_splits) -> ProblemDefinition: problem_definition = ProblemDefinition() problem_definition.set_task("regression") - problem_definition.add_input_scalars_names(["feature_name_1", "feature_name_2"]) + problem_definition.add_in_features_identifiers(["feature_name_1", "feature_name_2"]) problem_definition.set_split(main_splits) return problem_definition diff --git a/tests/test_problem_definition.py b/tests/test_problem_definition.py index a0b79885..1054c3d3 100644 --- a/tests/test_problem_definition.py +++ b/tests/test_problem_definition.py @@ -5,10 +5,6 @@ # # -# %% Imports - -import os -import subprocess from pathlib import Path import pytest @@ -18,8 +14,6 @@ from plaid.containers import FeatureIdentifier from plaid.problem_definition import ProblemDefinition -# %% Fixtures - @pytest.fixture() def problem_definition() -> ProblemDefinition: @@ -31,7 +25,6 @@ def problem_definition_full(problem_definition: ProblemDefinition) -> ProblemDef problem_definition.set_task("regression") problem_definition.set_name("regression_1") - # ---- feature_identifier = FeatureIdentifier({"type": "scalar", "name": "feature"}) predict_feature_identifier = FeatureIdentifier( {"type": "scalar", "name": "predict_feature"} @@ -47,51 +40,30 @@ def problem_definition_full(problem_definition: ProblemDefinition) -> ProblemDef [predict_feature_identifier, test_feature_identifier] ) problem_definition.add_out_feature_identifier(feature_identifier) - # ---- - feature_identifier = "Base_2_2/Zone/PointData/U1" - predict_feature_identifier = "Base_2_2/Zone/PointData/U2" - test_feature_identifier = "Base_2_2/Zone/PointData/sig12" + + str_feature = "Base_2_2/Zone/PointData/U1" + predict_str_feature = "Base_2_2/Zone/PointData/U2" + test_str_feature = "Base_2_2/Zone/PointData/sig12" problem_definition.add_in_features_identifiers( - [predict_feature_identifier, test_feature_identifier] + [predict_str_feature, test_str_feature] ) - problem_definition.add_in_feature_identifier(feature_identifier) + problem_definition.add_in_feature_identifier(str_feature) problem_definition.add_out_features_identifiers( - [predict_feature_identifier, test_feature_identifier] + [predict_str_feature, test_str_feature] ) - problem_definition.add_constant_feature_identifier(feature_identifier) + problem_definition.add_constant_feature_identifier(str_feature) problem_definition.add_constant_features_identifiers( - [predict_feature_identifier, test_feature_identifier] + [predict_str_feature, test_str_feature] ) - # ---- - problem_definition.add_input_scalars_names(["scalar", "test_scalar"]) - problem_definition.add_input_scalar_name("predict_scalar") - problem_definition.add_output_scalars_names(["scalar", "test_scalar"]) - problem_definition.add_output_scalar_name("predict_scalar") - - problem_definition.add_input_fields_names(["field", "test_field"]) - problem_definition.add_input_field_name("predict_field") - problem_definition.add_output_fields_names(["field", "test_field"]) - problem_definition.add_output_field_name("predict_field") - - problem_definition.add_input_timeseries_names(["timeseries", "test_timeseries"]) - problem_definition.add_input_timeseries_name("predict_timeseries") - problem_definition.add_output_timeseries_names(["timeseries", "test_timeseries"]) - problem_definition.add_output_timeseries_name("predict_timeseries") - - problem_definition.add_input_meshes_names(["mesh", "test_mesh"]) - problem_definition.add_input_mesh_name("predict_mesh") - problem_definition.add_output_meshes_names(["mesh", "test_mesh"]) - problem_definition.add_output_mesh_name("predict_mesh") - new_split = {"train": [0, 1, 2], "test": [3, 4]} problem_definition.set_split(new_split) - new_split = {"train_1": [0, 1, 2], "train_2": "all"} - problem_definition.set_train_split(new_split) + new_train_split = {"train_1": {"train": [0, 1]}, "train_2": {"train": "all"}} + problem_definition.set_train_split(new_train_split) - new_split = {"test_1": "all", "test_2": [0, 2]} - problem_definition.set_test_split(new_split) + new_test_split = {"test_1": {"test": "all"}, "test_2": {"test": [0, 2]}} + problem_definition.set_test_split(new_test_split) return problem_definition @@ -101,84 +73,33 @@ def current_directory() -> Path: return Path(__file__).absolute().parent -@pytest.fixture(scope="session", autouse=True) -def clean_tests(): - base_dir = Path(__file__).absolute().parent - if os.name == "nt": - # Windows - script_path = base_dir / "clean.bat" - retcode = subprocess.call(["cmd", "/c", str(script_path)]) - else: - # Unix - script_path = base_dir / "clean.sh" - retcode = subprocess.call(["sh", str(script_path)]) - assert retcode == 0, "Test cleanup script failed" - - -# %% Tests - - -class Test_ProblemDefinition: - def test__init__(self, problem_definition): +class TestProblemDefinition: + def test_init(self, problem_definition: ProblemDefinition): assert problem_definition.get_task() is None - print(problem_definition) - - def test__init__path(self, current_directory): - d_path = current_directory / "problem_definition" - ProblemDefinition(path=d_path) - - def test__init__directory_path(self, current_directory): - d_path = current_directory / "problem_definition" - ProblemDefinition(directory_path=d_path) + assert problem_definition.get_version() == Version(plaid.__version__) - def test__init__both_path_and_directory_path(self, current_directory): + def test_load_from_dir(self, current_directory: Path): d_path = current_directory / "problem_definition" - with pytest.raises(ValueError): - ProblemDefinition(path=d_path, directory_path=d_path) - - # -------------------------------------------------------------------------# - def test_version(self, problem_definition): - # Unauthorized version - assert problem_definition.get_version() == Version(plaid.__version__) + pb = ProblemDefinition.load(d_path) + assert isinstance(pb, ProblemDefinition) - # -------------------------------------------------------------------------# - def test_task(self, problem_definition): - # Unauthorized task + def test_task(self, problem_definition: ProblemDefinition): with pytest.raises(TypeError): - problem_definition.set_task("ighyurgv") + problem_definition.set_task("not_valid") problem_definition.set_task("classification") with pytest.raises(ValueError): problem_definition.set_task("regression") assert problem_definition.get_task() == "classification" - print(problem_definition) - # -------------------------------------------------------------------------# - def test_score_function(self, problem_definition): - # Unauthorized task + def test_score_function(self, problem_definition: ProblemDefinition): with pytest.raises(TypeError): - problem_definition.set_score_function("ighyurgv") + problem_definition.set_score_function("not_valid") problem_definition.set_score_function("RRMSE") with pytest.raises(ValueError): problem_definition.set_score_function("RRMSE") assert problem_definition.get_score_function() == "RRMSE" - print(problem_definition) - - # -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# - # -------------------------------------------------------------------------# - def test_get_in_features_identifiers(self, problem_definition): - assert problem_definition.get_in_features_identifiers() == [] - - def test_add_in_features_identifiers_fail_same_identifier(self, problem_definition): - dummy_identifier = FeatureIdentifier({"type": "scalar", "name": "dummy"}) - with pytest.raises(ValueError): - problem_definition.add_in_features_identifiers( - [dummy_identifier, dummy_identifier] - ) - problem_definition.add_in_feature_identifier(dummy_identifier) - with pytest.raises(ValueError): - problem_definition.add_in_feature_identifier(dummy_identifier) - def test_add_in_features_identifiers(self, problem_definition): + def test_add_in_features_identifiers(self, problem_definition: ProblemDefinition): dummy_identifier_1 = FeatureIdentifier({"type": "scalar", "name": "dummy_1"}) dummy_identifier_2 = FeatureIdentifier({"type": "scalar", "name": "dummy_2"}) dummy_identifier_3 = FeatureIdentifier({"type": "scalar", "name": "dummy_3"}) @@ -188,26 +109,15 @@ def test_add_in_features_identifiers(self, problem_definition): problem_definition.add_in_feature_identifier(dummy_identifier_3) inputs = problem_definition.get_in_features_identifiers() assert len(inputs) == 3 - assert set(inputs) == set( - [dummy_identifier_1, dummy_identifier_2, dummy_identifier_3] - ) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_get_out_features_identifiers(self, problem_definition): - assert problem_definition.get_out_features_identifiers() == [] - - def test_add_out_features_identifiers_fail(self, problem_definition): - dummy_identifier = FeatureIdentifier({"type": "scalar", "name": "dummy"}) - with pytest.raises(ValueError): - problem_definition.add_out_features_identifiers( - [dummy_identifier, dummy_identifier] - ) - problem_definition.add_out_feature_identifier(dummy_identifier) + assert set(inputs) == { + dummy_identifier_1, + dummy_identifier_2, + dummy_identifier_3, + } with pytest.raises(ValueError): - problem_definition.add_out_feature_identifier(dummy_identifier) + problem_definition.add_in_feature_identifier(dummy_identifier_1) - def test_add_out_features_identifiers(self, problem_definition): + def test_add_out_features_identifiers(self, problem_definition: ProblemDefinition): dummy_identifier_1 = FeatureIdentifier({"type": "scalar", "name": "dummy_1"}) dummy_identifier_2 = FeatureIdentifier({"type": "scalar", "name": "dummy_2"}) dummy_identifier_3 = FeatureIdentifier({"type": "scalar", "name": "dummy_3"}) @@ -217,534 +127,55 @@ def test_add_out_features_identifiers(self, problem_definition): problem_definition.add_out_feature_identifier(dummy_identifier_3) outputs = problem_definition.get_out_features_identifiers() assert len(outputs) == 3 - assert set(outputs) == set( - [dummy_identifier_1, dummy_identifier_2, dummy_identifier_3] - ) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_get_constant_features_identifiers(self, problem_definition): - assert problem_definition.get_constant_features_identifiers() == [] - - def test_add_constant_features_identifiers_fail(self, problem_definition): - dummy_identifier = FeatureIdentifier({"type": "scalar", "name": "dummy"}) - with pytest.raises(ValueError): - problem_definition.add_constant_features_identifiers( - [dummy_identifier, dummy_identifier] - ) - problem_definition.add_constant_feature_identifier(dummy_identifier) - with pytest.raises(ValueError): - problem_definition.add_constant_feature_identifier(dummy_identifier) - - def test_add_constant_features_identifiers(self, problem_definition): - dummy_identifier_1 = FeatureIdentifier({"type": "scalar", "name": "dummy_1"}) - dummy_identifier_2 = FeatureIdentifier({"type": "scalar", "name": "dummy_2"}) - dummy_identifier_3 = FeatureIdentifier({"type": "scalar", "name": "dummy_3"}) + assert set(outputs) == { + dummy_identifier_1, + dummy_identifier_2, + dummy_identifier_3, + } + with pytest.raises(ValueError): + problem_definition.add_out_feature_identifier(dummy_identifier_1) + + def test_constant_features(self, problem_definition: ProblemDefinition): + dummy_identifier_1 = "Base_2_2/Zone/PointData/U1" + dummy_identifier_2 = "Base_2_2/Zone/PointData/U2" + dummy_identifier_3 = "Base_2_2/Zone/PointData/sig12" problem_definition.add_constant_features_identifiers( [dummy_identifier_1, dummy_identifier_2] ) problem_definition.add_constant_feature_identifier(dummy_identifier_3) constants = problem_definition.get_constant_features_identifiers() assert len(constants) == 3 - assert set(constants) == set( - [dummy_identifier_1, dummy_identifier_2, dummy_identifier_3] - ) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_filter_features_identifiers(self, current_directory): - d_path = current_directory / "problem_definition" - problem = ProblemDefinition(d_path) - predict_feature_identifier = FeatureIdentifier( - {"type": "scalar", "name": "predict_feature"} - ) - test_feature_identifier = FeatureIdentifier( - {"type": "scalar", "name": "test_feature"} - ) - filter_in = problem.filter_in_features_identifiers( - [predict_feature_identifier, test_feature_identifier] - ) - filter_out = problem.filter_out_features_identifiers( - [predict_feature_identifier, test_feature_identifier] - ) - filter_cte = problem.filter_constant_features_identifiers( - [predict_feature_identifier, test_feature_identifier] - ) - filter_cte - assert len(filter_in) == 2 and filter_in == [ - predict_feature_identifier, - test_feature_identifier, - ] - assert filter_in != [test_feature_identifier, predict_feature_identifier], ( - "common inputs not sorted" - ) - - assert len(filter_out) == 2 and filter_out == [ - predict_feature_identifier, - test_feature_identifier, - ] - assert filter_out != [test_feature_identifier, predict_feature_identifier], ( - "common outputs not sorted" - ) - - inexisting_feature_identifier = FeatureIdentifier( - {"type": "scalar", "name": "inexisting_feature"} - ) - fail_filter_in = problem.filter_in_features_identifiers( - [inexisting_feature_identifier] - ) - fail_filter_out = problem.filter_out_features_identifiers( - [inexisting_feature_identifier] - ) - fail_filter_cte = problem.filter_constant_features_identifiers( - ["Base_2_2/Zone/PointData/inexisting_feature"] - ) - - assert fail_filter_in == [] - assert fail_filter_out == [] - assert fail_filter_cte == [] - - # -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# - # -------------------------------------------------------------------------# - def test_get_input_scalars_names(self, problem_definition): - assert problem_definition.get_input_scalars_names() == [] - - def test_add_input_scalars_names_fail_same_name(self, problem_definition): - with pytest.raises(ValueError): - problem_definition.add_input_scalars_names(["feature_name", "feature_name"]) - problem_definition.add_input_scalar_name("feature_name") - with pytest.raises(ValueError): - problem_definition.add_input_scalar_name("feature_name") - - def test_add_input_scalars_names(self, problem_definition): - problem_definition.add_input_scalars_names(["scalar", "test_scalar"]) - problem_definition.add_input_scalar_name("predict_scalar") - inputs = problem_definition.get_input_scalars_names() - assert len(inputs) == 3 - assert set(inputs) == set(["predict_scalar", "scalar", "test_scalar"]) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_get_output_scalars_names(self, problem_definition): - assert problem_definition.get_output_scalars_names() == [] - - def test_add_output_scalars_names_fail(self, problem_definition): - with pytest.raises(ValueError): - problem_definition.add_output_scalars_names( - ["feature_name", "feature_name"] - ) - problem_definition.add_output_scalar_name("feature_name") - with pytest.raises(ValueError): - problem_definition.add_output_scalar_name("feature_name") - - def test_add_output_scalars_names(self, problem_definition): - problem_definition.add_output_scalars_names(["scalar", "test_scalar"]) - problem_definition.add_output_scalar_name("predict_scalar") - outputs = problem_definition.get_output_scalars_names() - assert len(outputs) == 3 - assert set(outputs) == set(["predict_scalar", "scalar", "test_scalar"]) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_filter_scalars_names(self, current_directory): - d_path = current_directory / "problem_definition" - problem = ProblemDefinition(d_path) - filter_in = problem.filter_input_scalars_names( - ["predict_scalar", "test_scalar"] - ) - filter_out = problem.filter_output_scalars_names( - ["predict_scalar", "test_scalar"] - ) - assert len(filter_in) == 2 and filter_in == ["predict_scalar", "test_scalar"] - assert filter_in != ["test_scalar", "predict_scalar"], ( - "common inputs not sorted" - ) - - assert len(filter_out) == 2 and filter_out == ["predict_scalar", "test_scalar"] - assert filter_out != ["test_scalar", "predict_scalar"], ( - "common outputs not sorted" - ) - - fail_filter_in = problem.filter_input_scalars_names(["a_scalar"]) - fail_filter_out = problem.filter_output_scalars_names(["b_scalar"]) - - assert fail_filter_in == [] - assert fail_filter_out == [] - - # -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# - # -------------------------------------------------------------------------# - def test_get_input_fields_names(self, problem_definition): - assert problem_definition.get_input_fields_names() == [] - - def test_add_input_fields_names_fail_same_name(self, problem_definition): - with pytest.raises(ValueError): - problem_definition.add_input_fields_names(["feature_name", "feature_name"]) - problem_definition.add_input_field_name("feature_name") - with pytest.raises(ValueError): - problem_definition.add_input_field_name("feature_name") - - def test_add_input_fields_names(self, problem_definition): - problem_definition.add_input_fields_names(["field", "test_field"]) - problem_definition.add_input_field_name("predict_field") - inputs = problem_definition.get_input_fields_names() - assert len(inputs) == 3 - assert set(inputs) == set(["predict_field", "field", "test_field"]) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_get_output_fields_names(self, problem_definition): - assert problem_definition.get_output_fields_names() == [] - - def test_add_output_fields_names_fail(self, problem_definition): - with pytest.raises(ValueError): - problem_definition.add_output_fields_names(["feature_name", "feature_name"]) - problem_definition.add_output_field_name("feature_name") - with pytest.raises(ValueError): - problem_definition.add_output_field_name("feature_name") - - def test_add_output_fields_names(self, problem_definition): - problem_definition.add_output_fields_names(["field", "test_field"]) - problem_definition.add_output_field_name("predict_field") - outputs = problem_definition.get_output_fields_names() - assert len(outputs) == 3 - assert set(outputs) == set(["predict_field", "field", "test_field"]) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_filter_fields_names(self, current_directory): - d_path = current_directory / "problem_definition" - problem = ProblemDefinition(d_path) - filter_in = problem.filter_input_fields_names(["predict_field", "test_field"]) - filter_out = problem.filter_output_fields_names(["predict_field", "test_field"]) - assert len(filter_in) == 2 and filter_in == ["predict_field", "test_field"] - assert filter_in != ["test_field", "predict_field"], "common inputs not sorted" - - assert len(filter_out) == 2 and filter_out == ["predict_field", "test_field"] - assert filter_out != ["test_field", "predict_field"], ( - "common outputs not sorted" - ) - - fail_filter_in = problem.filter_input_fields_names(["a_field"]) - fail_filter_out = problem.filter_output_fields_names(["b_field"]) - - assert fail_filter_in == [] - assert fail_filter_out == [] - - # -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# - # -------------------------------------------------------------------------# - def test_get_input_timeseries_names(self, problem_definition): - assert problem_definition.get_input_timeseries_names() == [] - - def test_add_input_timeseries_names_fail_same_name(self, problem_definition): - with pytest.raises(ValueError): - problem_definition.add_input_timeseries_names( - ["feature_name", "feature_name"] - ) - problem_definition.add_input_timeseries_name("feature_name") - with pytest.raises(ValueError): - problem_definition.add_input_timeseries_name("feature_name") - - def test_add_input_timeseries_names(self, problem_definition): - problem_definition.add_input_timeseries_names(["timeseries", "test_timeseries"]) - problem_definition.add_input_timeseries_name("predict_timeseries") - inputs = problem_definition.get_input_timeseries_names() - assert len(inputs) == 3 - assert set(inputs) == set( - ["predict_timeseries", "timeseries", "test_timeseries"] - ) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_get_output_timeseries_names(self, problem_definition): - assert problem_definition.get_output_timeseries_names() == [] - - def test_add_output_timeseries_names_fail(self, problem_definition): - with pytest.raises(ValueError): - problem_definition.add_output_timeseries_names( - ["feature_name", "feature_name"] - ) - problem_definition.add_output_timeseries_name("feature_name") - with pytest.raises(ValueError): - problem_definition.add_output_timeseries_name("feature_name") - - def test_add_output_timeseries_names(self, problem_definition): - problem_definition.add_output_timeseries_names( - ["timeseries", "test_timeseries"] - ) - problem_definition.add_output_timeseries_name("predict_timeseries") - outputs = problem_definition.get_output_timeseries_names() - assert len(outputs) == 3 - assert set(outputs) == set( - ["predict_timeseries", "timeseries", "test_timeseries"] - ) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_filter_timeseries_names(self, current_directory): - d_path = current_directory / "problem_definition" - problem = ProblemDefinition(d_path) - filter_in = problem.filter_input_timeseries_names( - ["predict_timeseries", "test_timeseries"] - ) - filter_out = problem.filter_output_timeseries_names( - ["predict_timeseries", "test_timeseries"] - ) - assert len(filter_in) == 2 and filter_in == [ - "predict_timeseries", - "test_timeseries", - ] - assert filter_in != ["test_timeseries", "predict_timeseries"], ( - "common inputs not sorted" - ) - - assert len(filter_out) == 2 and filter_out == [ - "predict_timeseries", - "test_timeseries", - ] - assert filter_out != ["test_timeseries", "predict_timeseries"], ( - "common outputs not sorted" - ) - - fail_filter_in = problem.filter_input_timeseries_names(["a_timeseries"]) - fail_filter_out = problem.filter_output_timeseries_names(["b_timeseries"]) - - assert fail_filter_in == [] - assert fail_filter_out == [] - - # -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# - # -------------------------------------------------------------------------# - def test_get_input_meshes_names(self, problem_definition): - assert problem_definition.get_input_meshes_names() == [] - - def test_add_input_meshes_names_fail_same_name(self, problem_definition): - with pytest.raises(ValueError): - problem_definition.add_input_meshes_names(["feature_name", "feature_name"]) - problem_definition.add_input_mesh_name("feature_name") + assert set(constants) == { + dummy_identifier_1, + dummy_identifier_2, + dummy_identifier_3, + } with pytest.raises(ValueError): - problem_definition.add_input_mesh_name("feature_name") - - def test_add_input_meshes_names(self, problem_definition): - problem_definition.add_input_meshes_names(["mesh", "test_mesh"]) - problem_definition.add_input_mesh_name("predict_mesh") - inputs = problem_definition.get_input_meshes_names() - assert len(inputs) == 3 - assert set(inputs) == set(["predict_mesh", "mesh", "test_mesh"]) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_get_output_meshes_names(self, problem_definition): - assert problem_definition.get_output_meshes_names() == [] - - def test_add_output_meshes_names_fail(self, problem_definition): - with pytest.raises(ValueError): - problem_definition.add_output_meshes_names(["feature_name", "feature_name"]) - problem_definition.add_output_mesh_name("feature_name") - with pytest.raises(ValueError): - problem_definition.add_output_mesh_name("feature_name") - - def test_add_output_meshes_names(self, problem_definition): - problem_definition.add_output_meshes_names(["mesh", "test_mesh"]) - problem_definition.add_output_mesh_name("predict_mesh") - outputs = problem_definition.get_output_meshes_names() - assert len(outputs) == 3 - assert set(outputs) == set(["predict_mesh", "mesh", "test_mesh"]) - print(problem_definition) - - # -------------------------------------------------------------------------# - def test_filter_meshes_names(self, current_directory): - d_path = current_directory / "problem_definition" - problem = ProblemDefinition(d_path) - print(f"{problem=}") - print(f"{problem.get_input_meshes_names()=}") - filter_in = problem.filter_input_meshes_names(["predict_mesh", "test_mesh"]) - filter_out = problem.filter_output_meshes_names(["predict_mesh", "test_mesh"]) - assert len(filter_in) == 2 and filter_in == ["predict_mesh", "test_mesh"] - assert filter_in != ["test_mesh", "predict_mesh"], "common inputs not sorted" - - assert len(filter_out) == 2 and filter_out == ["predict_mesh", "test_mesh"] - assert filter_out != ["test_mesh", "predict_mesh"], "common outputs not sorted" - - fail_filter_in = problem.filter_input_meshes_names(["a_mesh"]) - fail_filter_out = problem.filter_output_meshes_names(["b_mesh"]) + problem_definition.add_constant_feature_identifier(dummy_identifier_1) - assert fail_filter_in == [] - assert fail_filter_out == [] - - # -------------------------------------------------------------------------# - def test_split(self, problem_definition): + def test_split(self, problem_definition: ProblemDefinition): new_split = {"train": [0, 1, 2], "test": [3, 4]} problem_definition.set_split(new_split) + assert set(problem_definition.get_split().keys()) == {"train", "test"} + assert set(problem_definition.get_all_indices()) == {0, 1, 2, 3, 4} assert problem_definition.get_split("train") == [0, 1, 2] - assert problem_definition.get_split("test") == [3, 4] - - all_split = problem_definition.get_split() - assert all_split["train"] == [0, 1, 2] and all_split["test"] == [3, 4] - assert problem_definition.get_all_indices() == [0, 1, 2, 3, 4] + with pytest.raises(KeyError): + problem_definition.get_split("val") - def test_train_split(self, problem_definition): - train_split = {"train1": [0, 1, 2], "train2": [3, 4]} + def test_train_test_split(self, problem_definition: ProblemDefinition): + train_split = {"train_1": {"train": [0, 1]}, "train_2": {"train": "all"}} + test_split = {"test_1": {"test": "all"}, "test_2": {"test": [0, 2]}} problem_definition.set_train_split(train_split) - problem_definition.get_train_split() - assert problem_definition.get_train_split("train1") == [0, 1, 2] - assert problem_definition.get_train_split("train2") == [3, 4] - - def test_test_split(self, problem_definition): - test_split = {"test1": [0, 1, 2], "test2": [3, 4]} problem_definition.set_test_split(test_split) - problem_definition.get_test_split() - assert problem_definition.get_test_split("test1") == [0, 1, 2] - assert problem_definition.get_test_split("test2") == [3, 4] - - # -------------------------------------------------------------------------# - def test__save_to_dir_( - self, problem_definition_full: ProblemDefinition, tmp_path: Path - ): - problem_definition_full._save_to_dir_(tmp_path / "problem_definition") - - def test_save_to_dir( - self, problem_definition_full: ProblemDefinition, tmp_path: Path - ): - problem_definition_full.save_to_dir(tmp_path / "problem_definition") - - def test_load_path_object(self, current_directory): - my_dir = Path(current_directory) - ProblemDefinition(my_dir / "problem_definition") - - def test___init___path( - self, problem_definition_full: ProblemDefinition, tmp_path: Path - ): - d_path = tmp_path / "problem_definition" - problem_definition_full._save_to_dir_(d_path) - # - problem = ProblemDefinition(d_path) - assert problem.get_task() == "regression" - assert set(problem.get_input_scalars_names()) == set( - ["predict_scalar", "scalar", "test_scalar"] - ) - assert set(problem.get_output_scalars_names()) == set( - ["predict_scalar", "scalar", "test_scalar"] - ) - all_split = problem.get_split() - assert all_split["train"] == [0, 1, 2] and all_split["test"] == [3, 4] - - def test__load_from_dir_( - self, problem_definition_full: ProblemDefinition, tmp_path: Path - ): - d_path = tmp_path / "problem_definition" - problem_definition_full._save_to_dir_(d_path) - # - problem = ProblemDefinition() - problem._load_from_dir_(d_path) - assert problem.get_task() == "regression" - assert set(problem.get_input_scalars_names()) == set( - ["predict_scalar", "scalar", "test_scalar"] - ) - assert set(problem.get_output_scalars_names()) == set( - ["predict_scalar", "scalar", "test_scalar"] - ) - all_split = problem.get_split() - assert all_split["train"] == [0, 1, 2] and all_split["test"] == [3, 4] + assert problem_definition.get_train_split("train_1") == {"train": [0, 1]} + assert problem_definition.get_test_split("test_2") == {"test": [0, 2]} + with pytest.raises(KeyError): + problem_definition.get_test_split("missing") - def test__load_from_file_( + def test_save_load_roundtrip( self, problem_definition_full: ProblemDefinition, tmp_path: Path ): - path = tmp_path / "pb_def" - problem_definition_full.save_to_file(path) - # - problem = ProblemDefinition() - problem._load_from_file_(path) - assert problem.get_task() == "regression" - assert set(problem.get_input_scalars_names()) == set( - ["predict_scalar", "scalar", "test_scalar"] - ) - assert set(problem.get_output_scalars_names()) == set( - ["predict_scalar", "scalar", "test_scalar"] - ) - - def test_load(self, problem_definition_full: ProblemDefinition, tmp_path: Path): - d_path = tmp_path / "problem_definition" - problem_definition_full._save_to_dir_(d_path) - # - problem = ProblemDefinition.load(d_path) - assert problem.get_task() == "regression" - assert problem.get_name() == "regression_1" - assert set(problem.get_input_scalars_names()) == set( - ["predict_scalar", "scalar", "test_scalar"] - ) - assert set(problem.get_output_scalars_names()) == set( - ["predict_scalar", "scalar", "test_scalar"] - ) - all_split = problem.get_split() - assert all_split["train"] == [0, 1, 2] and all_split["test"] == [3, 4] - - def test__load_from_dir__old_version( - self, problem_definition_full: ProblemDefinition, tmp_path: Path - ): - d_path = tmp_path / "problem_definition" - problem_definition_full._save_to_dir_(d_path) - # Modify the plaid version in saved file - infos_path = d_path / "problem_infos.yaml" - with infos_path.open("r") as f: - text = f.read().splitlines() - text.pop() - text.append("version: 0.1.7") - text.append("") - infos_path.write_text("\n".join(text)) - - # Load the problem definition from the directory - problem = ProblemDefinition.load(d_path) - assert problem.get_version() == Version("0.1.7") - - def test__load_from_dir__empty_dir(self, tmp_path): - problem = ProblemDefinition() - with pytest.raises(FileNotFoundError): - problem._load_from_dir_(tmp_path) - - def test__load_from_dir__non_existing_dir(self): - problem = ProblemDefinition() - non_existing_dir = Path("non_existing_path") - with pytest.raises(FileNotFoundError): - problem._load_from_dir_(non_existing_dir) - - def test__load_from_file__non_existing_file(self): - problem = ProblemDefinition() - non_existing_path = Path("non_existing_path") - with pytest.raises(FileNotFoundError): - problem._load_from_file_(non_existing_path) - - def test__load_from_dir__path_is_file(self, tmp_path): - problem = ProblemDefinition() - file_path = tmp_path / "file.yaml" - file_path.touch() # Create an empty file - with pytest.raises(FileExistsError): - problem._load_from_dir_(file_path) - - def test_extract_problem_definition_from_identifiers(self, problem_definition): - in_id_1 = FeatureIdentifier({"type": "scalar", "name": "in_1"}) - in_id_2 = FeatureIdentifier({"type": "scalar", "name": "in_2"}) - out_id_1 = FeatureIdentifier({"type": "scalar", "name": "out_1"}) - out_id_2 = FeatureIdentifier({"type": "scalar", "name": "out_2"}) - - problem_definition.add_in_features_identifiers([in_id_1, in_id_2]) - problem_definition.add_out_features_identifiers([out_id_1, out_id_2]) - problem_definition.set_task("regression") - problem_definition.set_name("regression_1") - with pytest.raises(ValueError): - problem_definition.set_name("regression_2") - problem_definition.set_split({"train": [0, 1], "test": [2, 3]}) - - sub_problem_definition = ( - problem_definition.extract_problem_definition_from_identifiers( - [in_id_1, out_id_1] - ) - ) - - assert sub_problem_definition.get_in_features_identifiers() == [in_id_1] - assert sub_problem_definition.get_out_features_identifiers() == [out_id_1] - assert sub_problem_definition.get_version() == problem_definition.get_version() - assert sub_problem_definition.get_task() == "regression" - assert sub_problem_definition.get_name() == "regression_1" - assert sub_problem_definition.get_split() == {"train": [0, 1], "test": [2, 3]} - - -# %% + out_dir = tmp_path / "pb_def" + problem_definition_full.save_to_dir(out_dir) + reloaded = ProblemDefinition.load(out_dir) + assert reloaded.model_dump() == problem_definition_full.model_dump()