Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
badeaaf
Refactor config, raw data task and metadata tasks to have explicit de…
felixschmitz Jul 31, 2025
26cff1b
Refactor dataset merging dependencies.
felixschmitz Jul 31, 2025
741ae92
Fix error handling in dataset merging.
felixschmitz Jul 31, 2025
851dd66
Move to .
felixschmitz Jul 31, 2025
166d02f
Rename mapping-dictionaries in metadata and dataset creation.
felixschmitz Aug 4, 2025
15cf8ec
Addendum to renaming.
felixschmitz Aug 4, 2025
5583763
Fix tests.
felixschmitz Aug 4, 2025
dfbbc54
Merge branch 'main' into make-task-dependencies-explicit
felixschmitz Aug 5, 2025
7064d98
Move error handling of empty inputs into utilities directory.
felixschmitz Aug 5, 2025
bcd246c
Move get_variable_names_in_module function.
felixschmitz Aug 7, 2025
1409733
Add available survey years to variable metadata.
felixschmitz Aug 7, 2025
fbc3372
Re-Add pytask-parallel.
felixschmitz Aug 7, 2025
43d8b09
Fix cleaning variables.
felixschmitz Aug 7, 2025
89ad982
Store merging information in yaml.
felixschmitz Aug 7, 2025
6afdec7
Rename variables and helper functions of metadata tasks.
felixschmitz Aug 7, 2025
d9d9157
Create helper function to move yml from bld to src.
felixschmitz Aug 7, 2025
3a615e2
Introduce module as umbrella-term for DataFrames (containing multiple…
felixschmitz Nov 14, 2025
fb81460
Provisional fix of dataset merging based on metadata mapping in yaml.
felixschmitz Nov 14, 2025
95364ba
Generalize error handling of missing function for script.
felixschmitz Nov 17, 2025
87194c7
Refactor combining variables from multiple modules to take place on m…
felixschmitz Nov 17, 2025
d30e6ac
Keep directory structure around.
hmgaudecker Nov 17, 2025
8858c44
Introduce module structure inside config to loop over tasks in a stat…
felixschmitz Nov 17, 2025
fd74f19
Merge branch 'make-task-dependencies-explicit' of github.com:OpenSour…
hmgaudecker Nov 19, 2025
cab979e
Get rid of pixi deprecation warning.
hmgaudecker Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/soep_preparation/convert_stata_to_pandas/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ def _iteratively_read_one_data_file(
return pd.concat(processed_chunks)


data_file_names = get_data_file_names(
DATA_FILE_NAMES = get_data_file_names(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DATA_FILE_NAMES = get_data_file_names(
MODULE_NAMES = get_module_names(

Right?

directory=SRC / "clean_variables",
data_root=DATA_ROOT,
soep_version=SOEP_VERSION,
)

for data_file_name in data_file_names:
for data_file_name in DATA_FILE_NAMES:

@task(id=data_file_name)
def task_read_one_data_file(
Expand Down Expand Up @@ -78,7 +78,7 @@ def task_read_one_data_file(
)


if not data_file_names:
if not DATA_FILE_NAMES:

@task
def _raise_no_data_files_found() -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _raise_no_data_files_found() -> None:
def _raise_no_soep_module_found() -> None:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a link to the (correct section of the) README on Github in the message?

Expand Down
75 changes: 41 additions & 34 deletions src/soep_preparation/create_metadata/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@
)


def _create_name_to_data_mapping() -> dict[str, pd.DataFrame]:
"""Mapping of data file and combined variable names to corresponding data."""
single_data_files = dict(
DATA_CATALOGS["cleaned_variables"]._entries.items() # noqa: SLF001
)
combined_variables = dict(
DATA_CATALOGS["combined_variables"]._entries.items() # noqa: SLF001
)

return single_data_files | combined_variables


def _create_name_to_metadata_mapping(
metadata_names: list[str],
) -> dict[str, pd.DataFrame]:
"""Mapping of metadata names to corresponding data."""
return {name: DATA_CATALOGS["metadata"][name] for name in metadata_names}


def _get_index_variables(
dataset: pd.DataFrame,
potential_index_variables: list[str],
Expand All @@ -33,39 +52,27 @@ def _get_variable_dtypes(
}


def _create_metadata_mapping(metadata: dict) -> dict[str, str]:
"""Create a mapping of column names to data file names.
def _create_metadata_mapping(data: dict) -> dict[str, str]:
"""Create a mapping of variable names to metadata file names.

Args:
metadata: A dictionary containing metadata entries.
data: A dictionary containing metadata entries.

Returns:
A mapping of variable names to data file names.
A mapping of variable names to metadata file names.
"""
mapping = {}
for data_name, data in metadata._entries.items(): # noqa: SLF001
if (
data_name not in DATA_CATALOGS["combined_variables"]._entries # noqa: SLF001
) and (
data_name not in DATA_CATALOGS["cleaned_variables"]._entries # noqa: SLF001
):
# Skip if data_name is neither among combined variables nor among data files
continue
variable_names = data.load()["variable_dtypes"].keys()
for metadata_name, metadata in data.items():
variable_names = metadata["variable_dtypes"].keys()
for variable_name in variable_names:
mapping[variable_name] = data_name
mapping[variable_name] = metadata_name
return mapping


single_data_files = dict(
DATA_CATALOGS["cleaned_variables"]._entries.items() # noqa: SLF001
)
combined_variables = dict(
DATA_CATALOGS["combined_variables"]._entries.items() # noqa: SLF001
)
MAPPING_NAME_TO_DATA = _create_name_to_data_mapping()


for name, data in (single_data_files | combined_variables).items():
for name, data in MAPPING_NAME_TO_DATA.items():

@task(id=name)
def task_create_metadata(
Expand Down Expand Up @@ -98,31 +105,31 @@ def task_create_metadata(
}


@task(after="task_create_metadata")
MAPPING_NAME_TO_METADATA = _create_name_to_metadata_mapping(MAPPING_NAME_TO_DATA.keys())


def task_create_metadata_mapping(
single_metadata_mapping: Annotated[dict, DATA_CATALOGS["metadata"]],
mapping_name_to_metadata: Annotated[
dict[str, pd.DataFrame], MAPPING_NAME_TO_METADATA
],
) -> Annotated[dict[str, dict], DATA_CATALOGS["metadata"]["merged"]]:
"""Create a mapping of variable names to data file names.

Args:
single_metadata_mapping: A dictionary containing single metadata entries.
mapping_name_to_metadata: A dictionary containing single metadata entries.

Returns:
A mapping of variable names to data file names.

Raises:
TypeError: If input data or data name is not of expected type.
"""
_error_handling_mapping_task(single_metadata_mapping)
return _create_metadata_mapping(single_metadata_mapping)
_error_handling_mapping_task(mapping_name_to_metadata)
return _create_metadata_mapping(mapping_name_to_metadata)


def _error_handling_mapping_task(mapping: Any) -> None:
fail_if_input_has_invalid_type(
input_=mapping, expected_dtypes=["_pytask.data_catalog.DataCatalog"]
)
for data_name, data in mapping._entries.items(): # noqa: SLF001
fail_if_input_has_invalid_type(input_=data_name, expected_dtypes=["str"])
fail_if_input_has_invalid_type(
input_=data, expected_dtypes=["_pytask.nodes.PickleNode"]
)
fail_if_input_has_invalid_type(input_=mapping, expected_dtypes=["dict"])
for metadata_name, metadata in mapping.items():
fail_if_input_has_invalid_type(input_=metadata_name, expected_dtypes=["str"])
fail_if_input_has_invalid_type(input_=metadata, expected_dtypes=["dict"])
42 changes: 22 additions & 20 deletions src/soep_preparation/dataset_merging/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from difflib import get_close_matches

import pandas as pd
from pytask import PNode, PProvisionalNode

from soep_preparation.config import DATA_CATALOGS, SURVEY_YEARS
from soep_preparation.utilities.error_handling import (
Expand All @@ -14,9 +15,9 @@ def create_dataset_from_variables(
variables: list[str],
min_and_max_survey_years: tuple[int, int] | None = None,
survey_years: list[int] | None = None,
variable_to_data_file_mapping: dict[str, list[str]] = DATA_CATALOGS["metadata"][
"merged"
],
mapping_variable_to_data_file: (
dict[str, list[str]] | PNode | PProvisionalNode
) = DATA_CATALOGS["metadata"]["merged"],
merging_behavior: str = "outer", # make only outer
) -> pd.DataFrame:
"""Create a dataset by merging different specified variables.
Expand All @@ -29,7 +30,7 @@ def create_dataset_from_variables(
min_and_max_survey_years: Range of survey years.
survey_years: Survey years to be included in the dataset.
Either `survey_years` or `min_and_max_survey_years` must be provided.
variable_to_data_file_mapping: A mapping of variable names to dataset names.
mapping_variable_to_data_file: A mapping of variable names to dataset names.
Defaults to `DATA_CATALOGS["metadata"]["merged"]`.
merging_behavior: The merging behavior to be used.
Any out of "left", "right", "outer", or "inner".
Expand All @@ -55,7 +56,7 @@ def create_dataset_from_variables(
Otherwise, `min_and_max_survey_years=(2024,2025)`
and `survey_years=[2024, 2025]`
both return a merged dataset with information from the two survey years.
`variable_to_data_file_mapping` is created automatically by the pipeline,
`mapping_variable_to_data_file` is created automatically by the pipeline,
it can be accessed and provided to the function at
`DATA_CATALOGS["metadata"]["merged"]`.
Specify `merging_behavior` to control the creation of the dataset
Expand All @@ -66,7 +67,7 @@ def create_dataset_from_variables(
For an example see `task_example.py`.
"""
_error_handling(
variable_to_data_file_mapping,
mapping_variable_to_data_file,
variables,
min_and_max_survey_years,
survey_years,
Expand All @@ -79,7 +80,7 @@ def create_dataset_from_variables(
variables,
)
dataset_merging_information = _get_sorted_dataset_merging_information(
variable_to_data_file_mapping,
mapping_variable_to_data_file,
variables,
survey_years,
)
Expand All @@ -91,14 +92,15 @@ def create_dataset_from_variables(


def _error_handling(
variable_to_data_file_mapping: dict[str, list[str]],
mapping_variable_to_data_file: dict[str, list[str]],
variables: list[str],
min_and_max_survey_years: tuple[int, int] | None,
survey_years: list[int] | None,
merging_behavior: str,
) -> None:
fail_if_input_has_invalid_type(
input_=variable_to_data_file_mapping, expected_dtypes=["dict"]
input_=mapping_variable_to_data_file,
expected_dtypes=["dict", "PNode", "PProvisionalNode"],
)
fail_if_input_has_invalid_type(input_=variables, expected_dtypes=["list"])
fail_if_input_has_invalid_type(
Expand All @@ -108,7 +110,7 @@ def _error_handling(
input_=survey_years, expected_dtypes=("list", "None")
)
fail_if_input_has_invalid_type(input_=merging_behavior, expected_dtypes=["str"])
_fail_if_empty(variable_to_data_file_mapping)
_fail_if_empty(mapping_variable_to_data_file)
_fail_if_empty(variables)
if survey_years is not None:
_fail_if_survey_years_not_valid(
Expand All @@ -120,25 +122,25 @@ def _error_handling(
)
_fail_if_min_larger_max(min_and_max_survey_years)
_fail_if_invalid_variable(
variables=variables, variable_to_data_file_mapping=variable_to_data_file_mapping
variables=variables, mapping_variable_to_data_file=mapping_variable_to_data_file
)
_fail_if_invalid_merging_behavior(merging_behavior)


def _fail_if_invalid_variable(
variables: list[str],
variable_to_data_file_mapping: dict[str, list[str]],
mapping_variable_to_data_file: dict[str, list[str]],
) -> None:
for variable in variables:
if variable not in variable_to_data_file_mapping:
if variable not in mapping_variable_to_data_file:
closest_matches = get_close_matches(
variable,
variable_to_data_file_mapping.keys(),
mapping_variable_to_data_file.keys(),
n=3,
cutoff=0.6,
)
matches = {
match: variable_to_data_file_mapping[match] for match in closest_matches
match: mapping_variable_to_data_file[match] for match in closest_matches
}
msg = f"""variable {variable} not found in any data file.
The closest matches with the corresponding data files are:
Expand Down Expand Up @@ -178,13 +180,13 @@ def _fail_if_invalid_merging_behavior(merging_behavior: str) -> None:


def _get_data_file_name_to_variables_mapping(
variable_to_data_file_mapping: dict[str, str],
mapping_variable_to_data_file: dict[str, str],
variables: list[str],
) -> dict[str, list[str]]:
data_file_name_to_variables_mapping = {}
for variable in variables:
if variable in variable_to_data_file_mapping:
data_file_name = variable_to_data_file_mapping[variable]
if variable in mapping_variable_to_data_file:
data_file_name = mapping_variable_to_data_file[variable]
if data_file_name not in data_file_name_to_variables_mapping:
data_file_name_to_variables_mapping[data_file_name] = []
data_file_name_to_variables_mapping[data_file_name].append(variable)
Expand Down Expand Up @@ -219,12 +221,12 @@ def _fix_user_input(


def _get_sorted_dataset_merging_information(
variable_to_data_file_mapping: dict[str, dict],
mapping_variable_to_data_file: dict[str, dict],
variables: list,
survey_years: list[int],
) -> dict[str, dict]:
data_mapping = _get_data_file_name_to_variables_mapping(
variable_to_data_file_mapping,
mapping_variable_to_data_file,
variables,
)

Expand Down
10 changes: 4 additions & 6 deletions src/soep_preparation/dataset_merging/task_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Annotated, Any

import pandas as pd
import pytask

from soep_preparation.config import DATA_CATALOGS, SURVEY_YEARS
from soep_preparation.dataset_merging.helper import create_dataset_from_variables
Expand All @@ -22,15 +21,14 @@
]


@pytask.mark.try_last
def task_merge_variables(
variable_to_data_file_mapping: Annotated[dict, DATA_CATALOGS["metadata"]["merged"]],
mapping_variable_to_data_file: Annotated[dict, DATA_CATALOGS["metadata"]["merged"]],
variables: Annotated[list[str], VARIABLES],
) -> Annotated[pd.DataFrame, DATA_CATALOGS["merged"]["example_merged_dataset"]]:
"""Example task merging based on variable names to create dataset.

Args:
variable_to_data_file_mapping: A mapping of variable names to dataset names.
mapping_variable_to_data_file: A mapping of variable names to dataset names.
variables: A list of variable names to be used for merging.

Returns:
Expand All @@ -39,11 +37,11 @@ def task_merge_variables(
Raises:
TypeError: If input mapping or variables is not of expected type.
"""
_error_handling_task(mapping=variable_to_data_file_mapping, variables=variables)
_error_handling_task(mapping=mapping_variable_to_data_file, variables=variables)
return create_dataset_from_variables(
variables=variables,
min_and_max_survey_years=(min(SURVEY_YEARS), max(SURVEY_YEARS)),
variable_to_data_file_mapping=variable_to_data_file_mapping,
mapping_variable_to_data_file=mapping_variable_to_data_file,
)


Expand Down
7 changes: 5 additions & 2 deletions src/soep_preparation/utilities/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ def _fail_if_raw_data_file_missing(
) -> None:
raw_data_file_path = data_root / f"{soep_version}" / f"{script_name}.dta"
if not raw_data_file_path.exists():
msg = f"""Raw data file {raw_data_file_path} not found for SOEP {soep_version}.
Ensure the file is present in the data directory for the corresponding wave."""
msg = (
f"Raw data file {raw_data_file_path} not found for SOEP {soep_version}.\n"
f"Ensure the file is present in the data directory\n"
f" corresponding to the specified wave."
)
raise FileNotFoundError(msg)


Expand Down
6 changes: 3 additions & 3 deletions tests/dataset_merging/test_panel_dataset_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_get_data_file_name_to_variables_mapping_assert_type():
},
)
input_ = {
"variable_to_data_file_mapping": {
"mapping_variable_to_data_file": {
"column1": "dataset1",
"column2": "dataset2",
"column3": "dataset1",
Expand All @@ -101,7 +101,7 @@ def test_get_data_file_name_to_variables_mapping_assert_mapping():
"dataset2": ["column2"],
}
input_ = {
"variable_to_data_file_mapping": {
"mapping_variable_to_data_file": {
"column1": "dataset1",
"column2": "dataset2",
"column3": "dataset1",
Expand All @@ -115,7 +115,7 @@ def test_get_data_file_name_to_variables_mapping_assert_mapping():
def test_get_data_file_name_to_variables_mapping_assert_datasets():
expected = ["dataset1", "dataset2"]
input_ = {
"variable_to_data_file_mapping": {
"mapping_variable_to_data_file": {
"column1": "dataset1",
"column2": "dataset2",
"column3": "dataset1",
Expand Down
Loading