Skip to content

Commit

Permalink
Merge branch 'feature/save_seq_datasets' into 'main'
Browse files Browse the repository at this point in the history
Add secure saving to SequentialDataset

See merge request ai-lab-pmo/mltools/recsys/RePlay!216
  • Loading branch information
OnlyDeniko committed Aug 6, 2024
2 parents 830e121 + 510d3ae commit fe42d13
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 41 deletions.
1 change: 1 addition & 0 deletions projects/pyproject.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ max-complexity = 13
"replay/experimental/models/extensions/spark_custom_models/als_extension.py" = ["ARG002", "N802", "N803", "N815"]
"replay/data/nn/sequence_tokenizer.py" = ["ARG003"]
"replay/splitters/base_splitter.py" = ["ARG003"]
"replay/data/nn/sequential_dataset.py" = ["ARG003"]

[tool.tomlsort]
ignore_case = true
Expand Down
42 changes: 42 additions & 0 deletions replay/data/nn/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,48 @@ def rating_feature_name(self) -> Optional[str]:
return None
return rating_features.item().name

def _get_object_args(self) -> Dict:
"""
Returns list of features represented as dictionaries.
"""
features = [
{
"name": feature.name,
"feature_type": feature.feature_type.name,
"is_seq": feature.is_seq,
"feature_hint": feature.feature_hint.name if feature.feature_hint else None,
"feature_sources": [
{"source": x.source.name, "column": x.column, "index": x.index} for x in feature.feature_sources
]
if feature.feature_sources
else None,
"cardinality": feature.cardinality if feature.feature_type == FeatureType.CATEGORICAL else None,
"embedding_dim": feature.embedding_dim if feature.feature_type == FeatureType.CATEGORICAL else None,
"tensor_dim": feature.tensor_dim if feature.feature_type == FeatureType.NUMERICAL else None,
}
for feature in self.all_features
]
return features

@classmethod
def _create_object_by_args(cls, args: Dict) -> "TensorSchema":
features_list = []
for feature_data in args:
feature_data["feature_sources"] = (
[
TensorFeatureSource(source=FeatureSource[x["source"]], column=x["column"], index=x["index"])
for x in feature_data["feature_sources"]
]
if feature_data["feature_sources"]
else None
)
f_type = feature_data["feature_type"]
f_hint = feature_data["feature_hint"]
feature_data["feature_type"] = FeatureType[f_type] if f_type else None
feature_data["feature_hint"] = FeatureHint[f_hint] if f_hint else None
features_list.append(TensorFeatureInfo(**feature_data))
return TensorSchema(features_list)

def filter(
self,
name: Optional[str] = None,
Expand Down
39 changes: 3 additions & 36 deletions replay/data/nn/sequence_tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,18 +425,7 @@ def load(cls, path: str, use_pickle: bool = False, **kwargs) -> "SequenceTokeniz

# load tensor_schema, tensor_features
tensor_schema_data = tokenizer_dict["init_args"]["tensor_schema"]
features_list = []
for feature_data in tensor_schema_data:
feature_data["feature_sources"] = [
TensorFeatureSource(source=FeatureSource[x["source"]], column=x["column"], index=x["index"])
for x in feature_data["feature_sources"]
]
f_type = feature_data["feature_type"]
f_hint = feature_data["feature_hint"]
feature_data["feature_type"] = FeatureType[f_type] if f_type else None
feature_data["feature_hint"] = FeatureHint[f_hint] if f_hint else None
features_list.append(TensorFeatureInfo(**feature_data))
tokenizer_dict["init_args"]["tensor_schema"] = TensorSchema(features_list)
tokenizer_dict["init_args"]["tensor_schema"] = TensorSchema._create_object_by_args(tensor_schema_data)

# Load encoder columns and rules
types = list(FeatureHint) + list(FeatureSource)
Expand All @@ -450,7 +439,7 @@ def load(cls, path: str, use_pickle: bool = False, **kwargs) -> "SequenceTokeniz
rule_data = rules_dict[rule]
if rule_data["mapping"] and rule_data["is_int"]:
rule_data["mapping"] = {int(key): value for key, value in rule_data["mapping"].items()}
del rule_data["is_int"]
del rule_data["is_int"]

tokenizer_dict["encoder"]["encoding_rules"][rule] = LabelEncodingRule(**rule_data)

Expand Down Expand Up @@ -481,31 +470,9 @@ def save(self, path: str, use_pickle: bool = False) -> None:
"allow_collect_to_master": self._allow_collect_to_master,
"handle_unknown_rule": self._encoder._handle_unknown_rule,
"default_value_rule": self._encoder._default_value_rule,
"tensor_schema": [],
"tensor_schema": self._tensor_schema._get_object_args(),
}

# save tensor schema
for feature in list(self._tensor_schema.values()):
tokenizer_dict["init_args"]["tensor_schema"].append(
{
"name": feature.name,
"feature_type": feature.feature_type.name,
"is_seq": feature.is_seq,
"feature_hint": feature.feature_hint.name if feature.feature_hint else None,
"feature_sources": [
{"source": x.source.name, "column": x.column, "index": x.index}
for x in feature.feature_sources
]
if feature.feature_sources
else None,
"cardinality": feature.cardinality if feature.feature_type == FeatureType.CATEGORICAL else None,
"embedding_dim": feature.embedding_dim
if feature.feature_type == FeatureType.CATEGORICAL
else None,
"tensor_dim": feature.tensor_dim if feature.feature_type == FeatureType.NUMERICAL else None,
}
)

# save DatasetLabelEncoder
tokenizer_dict["encoder"] = {
"features_columns": {key.name: value for key, value in self._encoder._features_columns.items()},
Expand Down
58 changes: 58 additions & 0 deletions replay/data/nn/sequential_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import abc
import json
from pathlib import Path
from typing import Tuple, Union

import numpy as np
import pandas as pd
import polars as pl
from pandas import DataFrame as PandasDataFrame
from polars import DataFrame as PolarsDataFrame
Expand Down Expand Up @@ -100,6 +103,23 @@ def keep_common_query_ids(
rhs_filtered = rhs.filter_by_query_id(common_queries)
return lhs_filtered, rhs_filtered

def save(self, path: str) -> None:
base_path = Path(path).with_suffix(".replay").resolve()
base_path.mkdir(parents=True, exist_ok=True)

sequential_dict = {}
sequential_dict["_class_name"] = self.__class__.__name__
self._sequences.reset_index().to_json(base_path / "sequences.json")
sequential_dict["init_args"] = {
"tensor_schema": self._tensor_schema._get_object_args(),
"query_id_column": self._query_id_column,
"item_id_column": self._item_id_column,
"sequences_path": "sequences.json",
}

with open(base_path / "init_args.json", "w+") as file:
json.dump(sequential_dict, file)


class PandasSequentialDataset(SequentialDataset):
"""
Expand Down Expand Up @@ -174,6 +194,25 @@ def _check_if_schema_matches_data(cls, tensor_schema: TensorSchema, data: Pandas
msg = "Tensor schema does not match with provided data frame"
raise ValueError(msg)

@classmethod
def load(cls, path: str, **kwargs) -> "PandasSequentialDataset":
"""
Method for loading PandasSequentialDataset object from `.replay` directory.
"""
base_path = Path(path).with_suffix(".replay").resolve()
with open(base_path / "init_args.json", "r") as file:
sequential_dict = json.loads(file.read())

sequences = pd.read_json(base_path / sequential_dict["init_args"]["sequences_path"])
dataset = cls(
tensor_schema=TensorSchema._create_object_by_args(sequential_dict["init_args"]["tensor_schema"]),
query_id_column=sequential_dict["init_args"]["query_id_column"],
item_id_column=sequential_dict["init_args"]["item_id_column"],
sequences=sequences,
)

return dataset


class PolarsSequentialDataset(PandasSequentialDataset):
"""
Expand Down Expand Up @@ -236,3 +275,22 @@ def _check_if_schema_matches_data(cls, tensor_schema: TensorSchema, data: Polars
if tensor_feature_name not in data:
msg = "Tensor schema does not match with provided data frame"
raise ValueError(msg)

@classmethod
def load(cls, path: str, **kwargs) -> "PandasSequentialDataset":
"""
Method for loading PandasSequentialDataset object from `.replay` directory.
"""
base_path = Path(path).with_suffix(".replay").resolve()
with open(base_path / "init_args.json", "r") as file:
sequential_dict = json.loads(file.read())

sequences = pl.DataFrame(pd.read_json(base_path / sequential_dict["init_args"]["sequences_path"]))
dataset = cls(
tensor_schema=TensorSchema._create_object_by_args(sequential_dict["init_args"]["tensor_schema"]),
query_id_column=sequential_dict["init_args"]["query_id_column"],
item_id_column=sequential_dict["init_args"]["item_id_column"],
sequences=sequences,
)

return dataset
4 changes: 3 additions & 1 deletion replay/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
]

if TORCH_AVAILABLE:
from replay.data.nn import SequenceTokenizer
from replay.data.nn import PandasSequentialDataset, PolarsSequentialDataset, SequenceTokenizer

SavableObject = Union[
ColdUserRandomSplitter,
Expand All @@ -53,6 +53,8 @@
TimeSplitter,
TwoStageSplitter,
SequenceTokenizer,
PandasSequentialDataset,
PolarsSequentialDataset,
]


Expand Down
8 changes: 4 additions & 4 deletions tests/data/nn/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,10 @@ def item_id_and_item_feature_schema():
def sequential_info(scope="package"):
sequences = pd.DataFrame(
[
(0, [1], [0, 1], [1, 2], np.array([[1, 2], [1, 2], [1, 2]])),
(1, [2], [0, 2, 3], [1, 3, 4], np.array([[1, 2], [1, 2]])),
(2, [3], [1], [2], np.array([[1, 2]])),
(3, [4], [0, 1, 2, 3, 4, 5], [1, 2, 3, 4, 5, 6], np.array([[1, 2], [1, 2], [1, 2], [1, 2]])),
(0, [1], [0, 1], [1, 2], [[1, 2], [1, 2], [1, 2]]),
(1, [2], [0, 2, 3], [1, 3, 4], [[1, 2], [1, 2]]),
(2, [3], [1], [2], [[1, 2]]),
(3, [4], [0, 1, 2, 3, 4, 5], [1, 2, 3, 4, 5, 6], [[1, 2], [1, 2], [1, 2], [1, 2]]),
],
columns=[
"user_id",
Expand Down
25 changes: 25 additions & 0 deletions tests/data/nn/test_sequential_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from replay.data import FeatureHint
from replay.utils import TORCH_AVAILABLE
from replay.utils.common import load_from_replay, save_to_replay

if TORCH_AVAILABLE:
from replay.data.nn import PandasSequentialDataset, PolarsSequentialDataset
Expand Down Expand Up @@ -165,3 +166,27 @@ def test_get_sequence_by_query_id(dataset_type, request):
sequential_info = request.getfixturevalue("sequential_info_polars")
dataset = dataset_type(**sequential_info)
assert np.array_equal(dataset.get_sequence_by_query_id(10, "some_item_feature"), np.array([], dtype=np.int64))


@pytest.mark.core
@pytest.mark.parametrize("dataset", ["sequential_info", "sequential_info_polars"])
def test_save_and_load_sequential_dataset(dataset, request, tmp_path):
data = request.getfixturevalue(dataset)

if isinstance(data["sequences"], pd.DataFrame):
obj_type = PandasSequentialDataset
else:
obj_type = PolarsSequentialDataset

sequential_dataset = obj_type(**data)
save_to_replay(sequential_dataset, tmp_path)
loaded_sequential_dataset = load_from_replay(tmp_path)

assert sequential_dataset._query_id_column == loaded_sequential_dataset._query_id_column
assert sequential_dataset._item_id_column == loaded_sequential_dataset._item_id_column
assert (
sequential_dataset._tensor_schema._get_object_args()
== loaded_sequential_dataset._tensor_schema._get_object_args()
)
assert all(sequential_dataset._sequences.columns == loaded_sequential_dataset._sequences.columns)
assert sequential_dataset._sequences.equals(loaded_sequential_dataset._sequences)

0 comments on commit fe42d13

Please sign in to comment.