Skip to content

Commit

Permalink
Merge branch 'feature/grouped_le' into 'main'
Browse files Browse the repository at this point in the history
GroupedLabelEncoder implementation

See merge request ai-lab-pmo/mltools/recsys/RePlay!239
  • Loading branch information
monkey0head committed Jan 27, 2025
2 parents 5bb07f9 + 5939cac commit c93bc22
Show file tree
Hide file tree
Showing 4 changed files with 597 additions and 151 deletions.
2 changes: 1 addition & 1 deletion replay/preprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
HistoryBasedFeaturesProcessor,
LogStatFeaturesProcessor,
)
from .label_encoder import LabelEncoder, LabelEncodingRule
from .label_encoder import GroupedLabelEncodingRule, LabelEncoder, LabelEncodingRule
from .sessionizer import Sessionizer
264 changes: 240 additions & 24 deletions replay/preprocessing/label_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import json
import os
import warnings
from itertools import chain
from pathlib import Path
from typing import Dict, List, Literal, Mapping, Optional, Sequence, Union

Expand All @@ -25,7 +26,9 @@
)

if PYSPARK_AVAILABLE:
from pyspark.sql import functions as sf
from pyspark.sql import (
functions as sf,
)
from pyspark.sql.types import LongType, StructType
from pyspark.storagelevel import StorageLevel

Expand Down Expand Up @@ -81,7 +84,7 @@ def set_handle_unknown(self, handle_unknown: HandleUnknownStrategies) -> None:

class LabelEncodingRule(BaseLabelEncodingRule):
"""
Implementation of the encoding rule for categorical variables of PySpark and Pandas Data Frames.
Implementation of the encoding rule for categorical variables of PySpark, Pandas and Polars Data Frames.
Encodes target labels with value between 0 and n_classes-1 for the given column.
It is recommended to use together with the LabelEncoder.
"""
Expand Down Expand Up @@ -547,48 +550,261 @@ def load(cls, path: str) -> "LabelEncodingRule":
return encoding_rule


class GroupedLabelEncodingRule(LabelEncodingRule):
"""
Implementation of the encoding rule for grouped categorical variables of PySpark, Pandas and Polars Data Frames.
Grouped means that one cell of the table contains a list with categorical values.
Encodes target labels with value between 0 and n_classes-1 for the given column.
It is recommended to use together with the LabelEncoder.
"""

_FAKE_INDEX_COLUMN_NAME: str = "__index__"

def fit(self, df: DataFrameLike) -> "GroupedLabelEncodingRule":
"""
Fits encoder to input dataframe.
:param df: input dataframe.
:returns: fitted EncodingRule.
"""
if self._mapping is not None:
return self

if isinstance(df, PandasDataFrame):
self._fit_pandas(df[[self.column]].explode(self.column))
elif isinstance(df, SparkDataFrame):
self._fit_spark(df.select(self.column).withColumn(self.column, sf.explode(self.column)))
elif isinstance(df, PolarsDataFrame):
self._fit_polars(df.select(self.column).explode(self.column))
else:
msg = f"{self.__class__.__name__} is not implemented for {type(df)}"
raise NotImplementedError(msg)
self._inverse_mapping = self._make_inverse_mapping()
self._inverse_mapping_list = self._make_inverse_mapping_list()
if self._handle_unknown == "use_default_value" and self._default_value in self._inverse_mapping:
msg = (
"The used value for default_value "
f"{self._default_value} is one of the "
"values already used for encoding the "
"seen labels."
)
raise ValueError(msg)
self._is_fitted = True
return self

def partial_fit(self, df: DataFrameLike) -> "GroupedLabelEncodingRule":
"""
Fits new data to already fitted encoder.
:param df: input dataframe.
:returns: fitted EncodingRule.
"""
if self._mapping is None:
return self.fit(df)
if isinstance(df, SparkDataFrame):
self._partial_fit_spark(df.select(self.column).withColumn(self.column, sf.explode(self.column)))
elif isinstance(df, PandasDataFrame):
self._partial_fit_pandas(df[[self.column]].explode(self.column))
elif isinstance(df, PolarsDataFrame):
self._partial_fit_polars(df.select(self.column).explode(self.column))
else:
msg = f"{self.__class__.__name__} is not implemented for {type(df)}"
raise NotImplementedError(msg)

self._is_fitted = True
return self

def _transform_spark(self, df: SparkDataFrame, default_value: Optional[int]) -> SparkDataFrame:
map_expr = sf.create_map([sf.lit(x) for x in chain(*self.get_mapping().items())])
encoded_df = df.withColumn(self._target_col, sf.transform(self.column, lambda x: map_expr.getItem(x)))

if self._handle_unknown == "drop":
encoded_df = encoded_df.withColumn(self._target_col, sf.filter(self._target_col, lambda x: x.isNotNull()))
if encoded_df.select(sf.max(sf.size(self._target_col))).first()[0] == 0:
warnings.warn(
f"You are trying to transform dataframe with all values are unknown for {self._col}, "
"with `handle_unknown_strategy=drop` leads to empty dataframe",
LabelEncoderTransformWarning,
)
elif self._handle_unknown == "error":
if (
encoded_df.select(sf.sum(sf.array_contains(self._target_col, -1).isNull().cast("integer"))).first()[0]
!= 0
):
msg = f"Found unknown labels in column {self._col} during transform"
raise ValueError(msg)
else:
if default_value:
encoded_df = encoded_df.withColumn(
self._target_col,
sf.transform(self._target_col, lambda x: sf.when(x.isNull(), default_value).otherwise(x)),
)

result_df = encoded_df.drop(self._col).withColumnRenamed(self._target_col, self._col)
return result_df

def _transform_pandas(self, df: PandasDataFrame, default_value: Optional[int]) -> PandasDataFrame:
mapping = self.get_mapping()
joined_df = df.copy()
if self._handle_unknown == "drop":
max_array_len = 0

def encode_func(array_col):
nonlocal mapping, max_array_len
res = []
for x in array_col:
cur_len = 0
mapped = mapping.get(x)
if mapped is not None:
res.append(mapped)
cur_len += 1
max_array_len = max(max_array_len, cur_len)
return res

joined_df[self._target_col] = joined_df[self._col].apply(encode_func)
if max_array_len == 0:
warnings.warn(
f"You are trying to transform dataframe with all values are unknown for {self._col}, "
"with `handle_unknown_strategy=drop` leads to empty dataframe",
LabelEncoderTransformWarning,
)
elif self._handle_unknown == "error":
none_count = 0

def encode_func(array_col):
nonlocal mapping, none_count
res = []
for x in array_col:
mapped = mapping.get(x)
if mapped is None:
none_count += 1
else:
res.append(mapped)
return res

joined_df[self._target_col] = joined_df[self._col].apply(encode_func)
if none_count != 0:
msg = f"Found unknown labels in column {self._col} during transform"
raise ValueError(msg)
else:

def encode_func(array_col):
nonlocal mapping
return [mapping.get(x, default_value) for x in array_col]

joined_df[self._target_col] = joined_df[self._col].apply(encode_func)

result_df = joined_df.drop(self._col, axis=1).rename(columns={self._target_col: self._col})
return result_df

def _transform_polars(self, df: PolarsDataFrame, default_value: Optional[int]) -> SparkDataFrame:
transformed_df = df.with_columns(
pl.col(self._col)
.list.eval(
pl.element().replace_strict(
self.get_mapping(), default=default_value if self._handle_unknown == "use_default_value" else None
),
parallel=True,
)
.alias(self._target_col)
)
if self._handle_unknown == "drop":
transformed_df = transformed_df.with_columns(pl.col(self._target_col).list.drop_nulls())
if (
transformed_df.with_columns(pl.col(self._target_col).list.len()).select(pl.sum(self._target_col)).item()
== 0
):
warnings.warn(
f"You are trying to transform dataframe with all values are unknown for {self._col}, "
"with `handle_unknown_strategy=drop` leads to empty dataframe",
LabelEncoderTransformWarning,
)
elif self._handle_unknown == "error":
none_checker = transformed_df.with_columns(
pl.col(self._target_col).list.contains(pl.lit(None, dtype=pl.Int64)).cast(pl.Int64)
)
if none_checker.select(pl.sum(self._target_col)).item() != 0:
msg = f"Found unknown labels in column {self._col} during transform"
raise ValueError(msg)

result_df = transformed_df.drop(self._col).rename({self._target_col: self._col})
return result_df

def _inverse_transform_pandas(self, df: PandasDataFrame) -> PandasDataFrame:
decoded_df = df.copy()

def decode_func(array_col):
return [self._inverse_mapping_list[x] for x in array_col]

decoded_df[self._col] = decoded_df[self._col].apply(decode_func)
return decoded_df

def _inverse_transform_polars(self, df: PolarsDataFrame) -> PolarsDataFrame:
mapping_size = len(self._inverse_mapping_list)
transformed_df = df.with_columns(
pl.col(self._col).list.eval(
pl.element().replace_strict(old=list(range(mapping_size)), new=self._inverse_mapping_list),
parallel=True,
)
)
return transformed_df

def _inverse_transform_spark(self, df: SparkDataFrame) -> SparkDataFrame:
array_expr = sf.array([sf.lit(x) for x in self._inverse_mapping_list])
decoded_df = df.withColumn(
self._target_col, sf.transform(self._col, lambda x: sf.element_at(array_expr, x + 1))
)
return decoded_df.drop(self._col).withColumnRenamed(self._target_col, self._col)


class LabelEncoder:
"""
Applies multiple label encoding rules to the data frame.
>>> import pandas as pd
>>> user_interactions = pd.DataFrame([
... ("u1", "item_1", "item_1"),
... ("u2", "item_2", "item_2"),
... ("u3", "item_3", "item_3"),
... ], columns=["user_id", "item_1", "item_2"])
... ("u1", "item_1", "item_1", [1, 2, 3]),
... ("u2", "item_2", "item_2", [3, 4, 5]),
... ("u3", "item_3", "item_3", [-1, -2, 4]),
... ], columns=["user_id", "item_1", "item_2", "list"])
>>> user_interactions
user_id item_1 item_2
0 u1 item_1 item_1
1 u2 item_2 item_2
2 u3 item_3 item_3
>>> encoder = LabelEncoder(
... [LabelEncodingRule("user_id"), LabelEncodingRule("item_1"), LabelEncodingRule("item_2")]
... )
user_id item_1 item_2 list
0 u1 item_1 item_1 [1, 2, 3]
1 u2 item_2 item_2 [3, 4, 5]
2 u3 item_3 item_3 [-1, -2, 4]
>>> encoder = LabelEncoder([
... LabelEncodingRule("user_id"),
... LabelEncodingRule("item_1"),
... LabelEncodingRule("item_2"),
... GroupedLabelEncodingRule("list"),
... ])
>>> mapped_interactions = encoder.fit_transform(user_interactions)
>>> mapped_interactions
user_id item_1 item_2
0 0 0 0
1 1 1 1
2 2 2 2
user_id item_1 item_2 list
0 0 0 0 [0, 1, 2]
1 1 1 1 [2, 3, 4]
2 2 2 2 [5, 6, 3]
>>> encoder.mapping
{'user_id': {'u1': 0, 'u2': 1, 'u3': 2},
'item_1': {'item_1': 0, 'item_2': 1, 'item_3': 2},
'item_2': {'item_1': 0, 'item_2': 1, 'item_3': 2}}
'item_2': {'item_1': 0, 'item_2': 1, 'item_3': 2},
'list': {1: 0, 2: 1, 3: 2, 4: 3, 5: 4, -1: 5, -2: 6}}
>>> encoder.inverse_mapping
{'user_id': {0: 'u1', 1: 'u2', 2: 'u3'},
'item_1': {0: 'item_1', 1: 'item_2', 2: 'item_3'},
'item_2': {0: 'item_1', 1: 'item_2', 2: 'item_3'}}
'item_2': {0: 'item_1', 1: 'item_2', 2: 'item_3'},
'list': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: -1, 6: -2}}
>>> new_encoder = LabelEncoder([
... LabelEncodingRule("user_id", encoder.mapping["user_id"]),
... LabelEncodingRule("item_1", encoder.mapping["item_1"]),
... LabelEncodingRule("item_2", encoder.mapping["item_2"])
... LabelEncodingRule("item_2", encoder.mapping["item_2"]),
... GroupedLabelEncodingRule("list", encoder.mapping["list"]),
... ])
>>> new_encoder.inverse_transform(mapped_interactions)
user_id item_1 item_2
0 u1 item_1 item_1
1 u2 item_2 item_2
2 u3 item_3 item_3
user_id item_1 item_2 list
0 u1 item_1 item_1 [1, 2, 3]
1 u2 item_2 item_2 [3, 4, 5]
2 u3 item_3 item_3 [-1, -2, 4]
<BLANKLINE>
"""

Expand Down
Loading

0 comments on commit c93bc22

Please sign in to comment.