diff --git a/replay/data/nn/sequence_tokenizer.py b/replay/data/nn/sequence_tokenizer.py index 0b3ae028..33debdac 100644 --- a/replay/data/nn/sequence_tokenizer.py +++ b/replay/data/nn/sequence_tokenizer.py @@ -535,9 +535,18 @@ def _process_feature(self, tensor_feature_name: str) -> _T: return self._process_num_feature(tensor_feature) assert False, "Unknown tensor feature type" - @abc.abstractmethod - def _process_num_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover - pass + def _process_num_feature(self, tensor_feature: TensorFeatureInfo) -> _T: + """ + Process numerical tensor feature depends on it source. + """ + assert tensor_feature.feature_sources is not None + if tensor_feature.feature_source.source == FeatureSource.INTERACTIONS: + return self._process_num_interaction_feature(tensor_feature) + if tensor_feature.feature_source.source == FeatureSource.QUERY_FEATURES: + return self._process_num_query_feature(tensor_feature) + if tensor_feature.feature_source.source == FeatureSource.ITEM_FEATURES: + return self._process_num_item_feature(tensor_feature) + assert False, "Unknown tensor feature source table" def _process_cat_feature(self, tensor_feature: TensorFeatureInfo) -> _T: """ @@ -564,6 +573,18 @@ def _process_cat_query_feature(self, tensor_feature: TensorFeatureInfo) -> _T: def _process_cat_item_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover pass + @abc.abstractmethod + def _process_num_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover + pass + + @abc.abstractmethod + def _process_num_query_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover + pass + + @abc.abstractmethod + def _process_num_item_feature(self, tensor_feature: TensorFeatureInfo) -> _T: # pragma: no cover + pass + class _PandasSequenceProcessor(_BaseSequenceProcessor[PandasDataFrame]): """ @@ -606,42 +627,57 @@ def process_features(self) -> PandasDataFrame: return PandasDataFrame(all_features) - def _process_num_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]: + def _process_num_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]: """ - Process numerical feature for all sources. + Process numerical interaction feature. :param tensor_feature: tensor feature information. - :returns: sequences for each query. - If feature came from item features then gets item features values. - If feature came from interactions then gets values from interactions. + :returns: tensor feature column as a sequences from `grouped_interactions`. """ - assert tensor_feature.feature_source is not None assert tensor_feature.is_seq + source = tensor_feature.feature_source + assert source is not None + + return [np.array(sequence) for sequence in self._grouped_interactions[source.column]] + + def _process_num_item_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]: + """ + Process numerical feature from item features dataset. + + :param tensor_feature: tensor feature information. + + :returns: tensor feature column as a sequences from `grouped_interactions`. + """ + assert tensor_feature.is_seq + assert self._item_features is not None + + source = tensor_feature.feature_source + assert source is not None + + item_feature = self._item_features[source.column] values: List[np.ndarray] = [] - for pos, item_id_sequence in enumerate(self._grouped_interactions[self._item_id_column]): - all_features_for_user = [] - if tensor_feature.feature_source.source == FeatureSource.ITEM_FEATURES: - item_feature = self._item_features[tensor_feature.feature_source.column] - feature_sequence = item_feature.loc[item_id_sequence].values - all_features_for_user.append(feature_sequence.tolist()) - elif tensor_feature.feature_source.source == FeatureSource.INTERACTIONS: - sequence = self._grouped_interactions[tensor_feature.feature_source.column][pos] - all_features_for_user.append(sequence) - else: - assert False, "Unknown tensor feature source table" - all_seqs = np.array(all_features_for_user) - if tensor_feature.feature_hint == FeatureHint.TIMESTAMP: - all_seqs = all_seqs.reshape(-1) - elif not tensor_feature.is_list: - all_seqs = all_seqs.reshape(-1, 1) + for item_id_sequence in self._grouped_interactions[self._item_id_column]: + feature_sequence = item_feature.loc[item_id_sequence].values + if tensor_feature.feature_type == FeatureType.NUMERICAL_LIST: + values.append(feature_sequence.tolist()) else: - all_seqs = all_seqs.squeeze(0) - values.append(all_seqs) + values.append(np.array(feature_sequence)) + return values + def _process_num_query_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]: + """ + Process numerical feature from query features dataset. + + :param tensor_feature: tensor feature information. + + :returns: tensor feature column as a sequences from `grouped_interactions`. + """ + return self._process_cat_query_feature(tensor_feature) + def _process_cat_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> List[np.ndarray]: """ Process categorical interaction feature. @@ -732,42 +768,37 @@ def process_features(self) -> PolarsDataFrame: data = data.join(self._process_feature(tensor_feature_name), on=self._query_id_column, how="left") return data - def _process_num_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame: - if tensor_feature.feature_source.source == FeatureSource.INTERACTIONS: - result = self._grouped_interactions.select( - self._query_id_column, tensor_feature.feature_source.column - ).rename({tensor_feature.feature_source.column: tensor_feature.name}) - elif tensor_feature.feature_source.source == FeatureSource.ITEM_FEATURES: - result = ( - self._grouped_interactions.select(self._query_id_column, self._item_id_column).map_rows( - lambda x: ( - x[0], - self._item_features.select(tensor_feature.feature_source.column) - .filter(self._item_features[self._item_id_column].is_in(x[1])) - .to_series() - .to_list(), - ) - ) - ).rename({"column_0": self._query_id_column, "column_1": tensor_feature.name}) - else: - assert False, "Unknown tensor feature source table" - - reshape_size = None - if tensor_feature.feature_hint == FeatureHint.TIMESTAMP: - reshape_size = (-1,) - elif not tensor_feature.is_list: - reshape_size = (-1, 1) - - if reshape_size is not None: - result = pl.DataFrame( - { - self._query_id_column: result[self._query_id_column], - tensor_feature.name: [ - np.array(x).reshape(reshape_size).tolist() for x in result[tensor_feature.name].to_list() - ], - } - ) - return result + def _process_num_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame: + """ + Process numerical interaction feature. + + :param tensor_feature: tensor feature information. + + :returns: tensor feature column as a sequences from `grouped_interactions`. + """ + return self._process_cat_interaction_feature(tensor_feature) + + def _process_num_query_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame: + """ + Process numerical feature from query features dataset. + + :param tensor_feature: tensor feature information. + + :returns: sequences with length of item sequence for each query for + sequential features and one size sequences otherwise. + """ + return self._process_cat_query_feature(tensor_feature) + + def _process_num_item_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame: + """ + Process numerical feature from item features dataset. + + :param tensor_feature: tensor feature information. + + :returns: item features as a sequence for each item in a sequence + for each query. + """ + return self._process_cat_item_feature(tensor_feature) def _process_cat_interaction_feature(self, tensor_feature: TensorFeatureInfo) -> PolarsDataFrame: """ diff --git a/replay/data/nn/torch_sequential_dataset.py b/replay/data/nn/torch_sequential_dataset.py index 40171c37..5c88d834 100644 --- a/replay/data/nn/torch_sequential_dataset.py +++ b/replay/data/nn/torch_sequential_dataset.py @@ -96,7 +96,7 @@ def _generate_tensor_feature( sequence = self._sequential.get_sequence(sequence_index, feature.name) if feature.is_seq: sequence = sequence[sequence_offset : sequence_offset + self._max_sequence_length] - tensor_dtype = self._get_tensor_dtype(feature) + tensor_dtype = self._get_tensor_dtype(sequence) tensor_sequence = torch.tensor(sequence, dtype=tensor_dtype) if feature.is_seq: tensor_sequence = self._pad_sequence(tensor_sequence, feature.padding_value) @@ -126,10 +126,10 @@ def _pad_sequence(self, sequence: torch.Tensor, padding_value: int) -> torch.Ten padded_sequence[-len(sequence) :].copy_(sequence) return padded_sequence - def _get_tensor_dtype(self, feature: TensorFeatureInfo) -> torch.dtype: - if feature.is_cat: + def _get_tensor_dtype(self, array: np.array) -> torch.dtype: + if np.issubdtype(array.dtype, np.integer): return torch.long - if feature.is_num: + if np.issubdtype(array.dtype, np.floating): return torch.float32 assert False, "Unknown tensor feature type" diff --git a/tests/data/nn/conftest.py b/tests/data/nn/conftest.py index a50d1498..cd1afd15 100644 --- a/tests/data/nn/conftest.py +++ b/tests/data/nn/conftest.py @@ -37,6 +37,13 @@ def query_features(): [5, 7, 6], [8], ], + "user_num": [1.0, 2.3, 11.8, -1.6], + "user_num_list": [ + [1.1, 2.1], + [4.2, 3.2], + [5.3, 7.3, 6.3], + [8.4], + ], } ) @@ -163,6 +170,8 @@ def small_feature_schema(): FeatureInfo("timestamp", FeatureType.NUMERICAL, FeatureHint.TIMESTAMP, FeatureSource.INTERACTIONS), FeatureInfo("user_cat", FeatureType.CATEGORICAL, None, FeatureSource.QUERY_FEATURES), FeatureInfo("user_cat_list", FeatureType.CATEGORICAL_LIST, None, FeatureSource.QUERY_FEATURES), + FeatureInfo("user_num", FeatureType.NUMERICAL, None, FeatureSource.QUERY_FEATURES), + FeatureInfo("user_num_list", FeatureType.NUMERICAL_LIST, None, FeatureSource.QUERY_FEATURES), FeatureInfo("item_cat", FeatureType.CATEGORICAL, None, FeatureSource.ITEM_FEATURES), FeatureInfo("item_cat_list", FeatureType.CATEGORICAL_LIST, None, FeatureSource.ITEM_FEATURES), FeatureInfo("item_num", FeatureType.NUMERICAL, None, FeatureSource.ITEM_FEATURES), diff --git a/tests/data/nn/test_sequence_tokenizer.py b/tests/data/nn/test_sequence_tokenizer.py index 3172947a..28d0ed6a 100644 --- a/tests/data/nn/test_sequence_tokenizer.py +++ b/tests/data/nn/test_sequence_tokenizer.py @@ -312,10 +312,10 @@ def test_item_features_are_grouped_to_sequences(dataset, item_id_and_item_featur tokenizer, "item_num", answers={ - 1: [[1.1], [1.2]], - 2: [[1.1], [1.3], [1.4]], - 3: [[1.2]], - 4: [[1.1], [1.2], [1.3], [1.4], [1.5], [1.6]], + 1: [1.1, 1.2], + 2: [1.1, 1.3, 1.4], + 3: [1.2], + 4: [1.1, 1.2, 1.3, 1.4, 1.5, 1.6], }, ) _compare_sequence( @@ -472,6 +472,13 @@ def test_process_numerical_features(dataset, request): feature_type=FeatureType.NUMERICAL, feature_sources=[TensorFeatureSource(FeatureSource.ITEM_FEATURES, "item_num")], ), + TensorFeatureInfo( + "user_num", + tensor_dim=1, + is_seq=True, + feature_type=FeatureType.NUMERICAL, + feature_sources=[TensorFeatureSource(FeatureSource.QUERY_FEATURES, "user_num")], + ), TensorFeatureInfo( "doubled_feature", tensor_dim=2, @@ -498,10 +505,10 @@ def test_process_numerical_features(dataset, request): sequential_dataset = tokenizer.fit_transform(data) answers = { - 1: [[1.1], [1.2]], - 2: [[1.1], [1.3], [1.4]], - 3: [[1.2]], - 4: [[1.1], [1.2], [1.3], [1.4], [1.5], [1.6]], + 1: [1.1, 1.2], + 2: [1.1, 1.3, 1.4], + 3: [1.2], + 4: [1.1, 1.2, 1.3, 1.4, 1.5, 1.6], } _compare_sequence( sequential_dataset, @@ -509,11 +516,13 @@ def test_process_numerical_features(dataset, request): "item_num", answers, ) - for num_feature_name in ["feature", "item_num", "doubled_feature"]: + + for num_feature_name in ["feature", "item_num", "doubled_feature", "user_num"]: for query in sequential_dataset.get_all_query_ids(): query_decoded = tokenizer.query_id_encoder.inverse_mapping["user_id"][query] seq = sequential_dataset.get_sequence_by_query_id(query, num_feature_name) - assert seq.shape == (len(answers[query_decoded]), 1) + assert len(seq.shape) == 1 + assert seq.shape[0] == len(answers[query_decoded]) _compare_sequence( sequential_dataset, @@ -899,7 +908,6 @@ def test_save_and_load_different_features_to_keep( tokenizer = SequenceTokenizer.load(f"sequence_tokenizer.{extension}", use_pickle=use_pickle) some_item_feature_transformed = tokenizer.transform(data, tensor_features_to_keep=["item_num"]) - _compare_sequence( item_id_transformed, tokenizer, @@ -917,10 +925,10 @@ def test_save_and_load_different_features_to_keep( tokenizer, "item_num", answers={ - 1: [[1.1], [1.2]], - 2: [[1.1], [1.3], [1.4]], - 3: [[1.2]], - 4: [[1.1], [1.2], [1.3], [1.4], [1.5], [1.6]], + 1: [1.1, 1.2], + 2: [1.1, 1.3, 1.4], + 3: [1.2], + 4: [1.1, 1.2, 1.3, 1.4, 1.5, 1.6], }, ) try: diff --git a/tests/data/nn/test_torch_sequential_dataset.py b/tests/data/nn/test_torch_sequential_dataset.py index 8003aa19..61f14129 100644 --- a/tests/data/nn/test_torch_sequential_dataset.py +++ b/tests/data/nn/test_torch_sequential_dataset.py @@ -117,25 +117,25 @@ def test_can_get_windowed_query_feature(sequential_dataset: SequentialDataset): @pytest.mark.torch -def test_num_dtype(sequential_dataset, some_num_tensor_feature): - feature = TensorFeatureInfo(name="user_id", feature_type=FeatureType.NUMERICAL, tensor_dim=64) +def test_num_dtype(sequential_dataset): + array = np.array([[1.0, 2.0], [3.0, 4.0]]) assert ( TorchSequentialDataset( sequential_dataset, max_sequence_length=3, sliding_window_step=2, - )._get_tensor_dtype(feature) + )._get_tensor_dtype(array) == torch.float32 ) - feature._feature_type = None + array = np.array([["q", "w"], ["e", "r"]]) with pytest.raises(AssertionError): TorchSequentialDataset( sequential_dataset, max_sequence_length=3, sliding_window_step=2, - )._get_tensor_dtype(feature) + )._get_tensor_dtype(array) @pytest.mark.torch