Skip to content

Commit

Permalink
Merge branch 'fix/tokenizer_memory_leak' into 'main'
Browse files Browse the repository at this point in the history
Fix memory leak inside SequentialTokenizer.tokenizer method

See merge request ai-lab-pmo/mltools/recsys/RePlay!249
  • Loading branch information
OnlyDeniko committed Feb 19, 2025
2 parents eac7c6b + d5ae740 commit ad343ff
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 87 deletions.
157 changes: 94 additions & 63 deletions replay/data/nn/sequence_tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,18 @@ def _process_feature(self, tensor_feature_name: str) -> _T:
return self._process_num_feature(tensor_feature)
assert False, "Unknown tensor feature type"

@abc.abstractmethod
def _process_num_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover
pass
def _process_num_feature(self, tensor_feature: TensorFeatureInfo) -> _T:
"""
Process numerical tensor feature depends on it source.
"""
assert tensor_feature.feature_sources is not None
if tensor_feature.feature_source.source == FeatureSource.INTERACTIONS:
return self._process_num_interaction_feature(tensor_feature)
if tensor_feature.feature_source.source == FeatureSource.QUERY_FEATURES:
return self._process_num_query_feature(tensor_feature)
if tensor_feature.feature_source.source == FeatureSource.ITEM_FEATURES:
return self._process_num_item_feature(tensor_feature)
assert False, "Unknown tensor feature source table"

def _process_cat_feature(self, tensor_feature: TensorFeatureInfo) -> _T:
"""
Expand All @@ -564,6 +573,18 @@ def _process_cat_query_feature(self, tensor_feature: TensorFeatureInfo) -> _T:
def _process_cat_item_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover
pass

@abc.abstractmethod
def _process_num_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover
pass

@abc.abstractmethod
def _process_num_query_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover
pass

@abc.abstractmethod
def _process_num_item_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover
pass


class _PandasSequenceProcessor(_BaseSequenceProcessor[PandasDataFrame]):
"""
Expand Down Expand Up @@ -606,42 +627,57 @@ def process_features(self) -> PandasDataFrame:

return PandasDataFrame(all_features)

def _process_num_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]:
def _process_num_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]:
"""
Process numerical feature for all sources.
Process numerical interaction feature.
:param tensor_feature: tensor feature information.
:returns: sequences for each query.
If feature came from item features then gets item features values.
If feature came from interactions then gets values from interactions.
:returns: tensor feature column as a sequences from `grouped_interactions`.
"""
assert tensor_feature.feature_source is not None
assert tensor_feature.is_seq

source = tensor_feature.feature_source
assert source is not None

return [np.array(sequence) for sequence in self._grouped_interactions[source.column]]

def _process_num_item_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]:
"""
Process numerical feature from item features dataset.
:param tensor_feature: tensor feature information.
:returns: tensor feature column as a sequences from `grouped_interactions`.
"""
assert tensor_feature.is_seq
assert self._item_features is not None

source = tensor_feature.feature_source
assert source is not None

item_feature = self._item_features[source.column]
values: List[np.ndarray] = []
for pos, item_id_sequence in enumerate(self._grouped_interactions[self._item_id_column]):
all_features_for_user = []
if tensor_feature.feature_source.source == FeatureSource.ITEM_FEATURES:
item_feature = self._item_features[tensor_feature.feature_source.column]
feature_sequence = item_feature.loc[item_id_sequence].values
all_features_for_user.append(feature_sequence.tolist())
elif tensor_feature.feature_source.source == FeatureSource.INTERACTIONS:
sequence = self._grouped_interactions[tensor_feature.feature_source.column][pos]
all_features_for_user.append(sequence)
else:
assert False, "Unknown tensor feature source table"

all_seqs = np.array(all_features_for_user)
if tensor_feature.feature_hint == FeatureHint.TIMESTAMP:
all_seqs = all_seqs.reshape(-1)
elif not tensor_feature.is_list:
all_seqs = all_seqs.reshape(-1, 1)
for item_id_sequence in self._grouped_interactions[self._item_id_column]:
feature_sequence = item_feature.loc[item_id_sequence].values
if tensor_feature.feature_type == FeatureType.NUMERICAL_LIST:
values.append(feature_sequence.tolist())
else:
all_seqs = all_seqs.squeeze(0)
values.append(all_seqs)
values.append(np.array(feature_sequence))

return values

def _process_num_query_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]:
"""
Process numerical feature from query features dataset.
:param tensor_feature: tensor feature information.
:returns: tensor feature column as a sequences from `grouped_interactions`.
"""
return self._process_cat_query_feature(tensor_feature)

def _process_cat_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]:
"""
Process categorical interaction feature.
Expand Down Expand Up @@ -732,42 +768,37 @@ def process_features(self) -> PolarsDataFrame:
data = data.join(self._process_feature(tensor_feature_name), on=self._query_id_column, how="left")
return data

def _process_num_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame:
if tensor_feature.feature_source.source == FeatureSource.INTERACTIONS:
result = self._grouped_interactions.select(
self._query_id_column, tensor_feature.feature_source.column
).rename({tensor_feature.feature_source.column: tensor_feature.name})
elif tensor_feature.feature_source.source == FeatureSource.ITEM_FEATURES:
result = (
self._grouped_interactions.select(self._query_id_column, self._item_id_column).map_rows(
lambda x: (
x[0],
self._item_features.select(tensor_feature.feature_source.column)
.filter(self._item_features[self._item_id_column].is_in(x[1]))
.to_series()
.to_list(),
)
)
).rename({"column_0": self._query_id_column, "column_1": tensor_feature.name})
else:
assert False, "Unknown tensor feature source table"

reshape_size = None
if tensor_feature.feature_hint == FeatureHint.TIMESTAMP:
reshape_size = (-1,)
elif not tensor_feature.is_list:
reshape_size = (-1, 1)

if reshape_size is not None:
result = pl.DataFrame(
{
self._query_id_column: result[self._query_id_column],
tensor_feature.name: [
np.array(x).reshape(reshape_size).tolist() for x in result[tensor_feature.name].to_list()
],
}
)
return result
def _process_num_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame:
"""
Process numerical interaction feature.
:param tensor_feature: tensor feature information.
:returns: tensor feature column as a sequences from `grouped_interactions`.
"""
return self._process_cat_interaction_feature(tensor_feature)

def _process_num_query_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame:
"""
Process numerical feature from query features dataset.
:param tensor_feature: tensor feature information.
:returns: sequences with length of item sequence for each query for
sequential features and one size sequences otherwise.
"""
return self._process_cat_query_feature(tensor_feature)

def _process_num_item_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame:
"""
Process numerical feature from item features dataset.
:param tensor_feature: tensor feature information.
:returns: item features as a sequence for each item in a sequence
for each query.
"""
return self._process_cat_item_feature(tensor_feature)

def _process_cat_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame:
"""
Expand Down
8 changes: 4 additions & 4 deletions replay/data/nn/torch_sequential_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def _generate_tensor_feature(
sequence = self._sequential.get_sequence(sequence_index, feature.name)
if feature.is_seq:
sequence = sequence[sequence_offset : sequence_offset + self._max_sequence_length]
tensor_dtype = self._get_tensor_dtype(feature)
tensor_dtype = self._get_tensor_dtype(sequence)
tensor_sequence = torch.tensor(sequence, dtype=tensor_dtype)
if feature.is_seq:
tensor_sequence = self._pad_sequence(tensor_sequence, feature.padding_value)
Expand Down Expand Up @@ -126,10 +126,10 @@ def _pad_sequence(self, sequence: torch.Tensor, padding_value: int) -> torch.Ten
padded_sequence[-len(sequence) :].copy_(sequence)
return padded_sequence

def _get_tensor_dtype(self, feature: TensorFeatureInfo) -> torch.dtype:
if feature.is_cat:
def _get_tensor_dtype(self, array: np.array) -> torch.dtype:
if np.issubdtype(array.dtype, np.integer):
return torch.long
if feature.is_num:
if np.issubdtype(array.dtype, np.floating):
return torch.float32
assert False, "Unknown tensor feature type"

Expand Down
9 changes: 9 additions & 0 deletions tests/data/nn/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ def query_features():
[5, 7, 6],
[8],
],
"user_num": [1.0, 2.3, 11.8, -1.6],
"user_num_list": [
[1.1, 2.1],
[4.2, 3.2],
[5.3, 7.3, 6.3],
[8.4],
],
}
)

Expand Down Expand Up @@ -163,6 +170,8 @@ def small_feature_schema():
FeatureInfo("timestamp", FeatureType.NUMERICAL, FeatureHint.TIMESTAMP, FeatureSource.INTERACTIONS),
FeatureInfo("user_cat", FeatureType.CATEGORICAL, None, FeatureSource.QUERY_FEATURES),
FeatureInfo("user_cat_list", FeatureType.CATEGORICAL_LIST, None, FeatureSource.QUERY_FEATURES),
FeatureInfo("user_num", FeatureType.NUMERICAL, None, FeatureSource.QUERY_FEATURES),
FeatureInfo("user_num_list", FeatureType.NUMERICAL_LIST, None, FeatureSource.QUERY_FEATURES),
FeatureInfo("item_cat", FeatureType.CATEGORICAL, None, FeatureSource.ITEM_FEATURES),
FeatureInfo("item_cat_list", FeatureType.CATEGORICAL_LIST, None, FeatureSource.ITEM_FEATURES),
FeatureInfo("item_num", FeatureType.NUMERICAL, None, FeatureSource.ITEM_FEATURES),
Expand Down
38 changes: 23 additions & 15 deletions tests/data/nn/test_sequence_tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,10 @@ def test_item_features_are_grouped_to_sequences(dataset, item_id_and_item_featur
tokenizer,
"item_num",
answers={
1: [[1.1], [1.2]],
2: [[1.1], [1.3], [1.4]],
3: [[1.2]],
4: [[1.1], [1.2], [1.3], [1.4], [1.5], [1.6]],
1: [1.1, 1.2],
2: [1.1, 1.3, 1.4],
3: [1.2],
4: [1.1, 1.2, 1.3, 1.4, 1.5, 1.6],
},
)
_compare_sequence(
Expand Down Expand Up @@ -472,6 +472,13 @@ def test_process_numerical_features(dataset, request):
feature_type=FeatureType.NUMERICAL,
feature_sources=[TensorFeatureSource(FeatureSource.ITEM_FEATURES, "item_num")],
),
TensorFeatureInfo(
"user_num",
tensor_dim=1,
is_seq=True,
feature_type=FeatureType.NUMERICAL,
feature_sources=[TensorFeatureSource(FeatureSource.QUERY_FEATURES, "user_num")],
),
TensorFeatureInfo(
"doubled_feature",
tensor_dim=2,
Expand All @@ -498,22 +505,24 @@ def test_process_numerical_features(dataset, request):
sequential_dataset = tokenizer.fit_transform(data)

answers = {
1: [[1.1], [1.2]],
2: [[1.1], [1.3], [1.4]],
3: [[1.2]],
4: [[1.1], [1.2], [1.3], [1.4], [1.5], [1.6]],
1: [1.1, 1.2],
2: [1.1, 1.3, 1.4],
3: [1.2],
4: [1.1, 1.2, 1.3, 1.4, 1.5, 1.6],
}
_compare_sequence(
sequential_dataset,
tokenizer,
"item_num",
answers,
)
for num_feature_name in ["feature", "item_num", "doubled_feature"]:

for num_feature_name in ["feature", "item_num", "doubled_feature", "user_num"]:
for query in sequential_dataset.get_all_query_ids():
query_decoded = tokenizer.query_id_encoder.inverse_mapping["user_id"][query]
seq = sequential_dataset.get_sequence_by_query_id(query, num_feature_name)
assert seq.shape == (len(answers[query_decoded]), 1)
assert len(seq.shape) == 1
assert seq.shape[0] == len(answers[query_decoded])

_compare_sequence(
sequential_dataset,
Expand Down Expand Up @@ -899,7 +908,6 @@ def test_save_and_load_different_features_to_keep(

tokenizer = SequenceTokenizer.load(f"sequence_tokenizer.{extension}", use_pickle=use_pickle)
some_item_feature_transformed = tokenizer.transform(data, tensor_features_to_keep=["item_num"])

_compare_sequence(
item_id_transformed,
tokenizer,
Expand All @@ -917,10 +925,10 @@ def test_save_and_load_different_features_to_keep(
tokenizer,
"item_num",
answers={
1: [[1.1], [1.2]],
2: [[1.1], [1.3], [1.4]],
3: [[1.2]],
4: [[1.1], [1.2], [1.3], [1.4], [1.5], [1.6]],
1: [1.1, 1.2],
2: [1.1, 1.3, 1.4],
3: [1.2],
4: [1.1, 1.2, 1.3, 1.4, 1.5, 1.6],
},
)
try:
Expand Down
10 changes: 5 additions & 5 deletions tests/data/nn/test_torch_sequential_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,25 @@ def test_can_get_windowed_query_feature(sequential_dataset: SequentialDataset):


@pytest.mark.torch
def test_num_dtype(sequential_dataset, some_num_tensor_feature):
feature = TensorFeatureInfo(name="user_id", feature_type=FeatureType.NUMERICAL, tensor_dim=64)
def test_num_dtype(sequential_dataset):
array = np.array([[1.0, 2.0], [3.0, 4.0]])
assert (
TorchSequentialDataset(
sequential_dataset,
max_sequence_length=3,
sliding_window_step=2,
)._get_tensor_dtype(feature)
)._get_tensor_dtype(array)
== torch.float32
)

feature._feature_type = None
array = np.array([["q", "w"], ["e", "r"]])

with pytest.raises(AssertionError):
TorchSequentialDataset(
sequential_dataset,
max_sequence_length=3,
sliding_window_step=2,
)._get_tensor_dtype(feature)
)._get_tensor_dtype(array)


@pytest.mark.torch
Expand Down

0 comments on commit ad343ff

Please sign in to comment.