Skip to content

Commit

Permalink
Merge branch 'feature/deterministic_dataframe_le' into 'main'
Browse files Browse the repository at this point in the history
Rewrite Label Encoder in deterministic mode

See merge request ai-lab-pmo/mltools/recsys/RePlay!244
  • Loading branch information
OnlyDeniko committed Feb 19, 2025
2 parents bbcc2ea + 8940551 commit eac7c6b
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 33 deletions.
2 changes: 1 addition & 1 deletion replay/preprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
HistoryBasedFeaturesProcessor,
LogStatFeaturesProcessor,
)
from .label_encoder import LabelEncoder, LabelEncodingRule, SequenceEncodingRule
from .label_encoder import LabelEncoder, LabelEncoderPartialFitWarning, LabelEncodingRule, SequenceEncodingRule
from .sessionizer import Sessionizer
76 changes: 45 additions & 31 deletions replay/preprocessing/label_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@
)

if PYSPARK_AVAILABLE:
from pyspark.sql import (
functions as sf,
)
from pyspark.sql.types import LongType, StructType
from pyspark.storagelevel import StorageLevel
from pyspark.sql import Window, functions as sf # noqa: I001
from pyspark.sql.types import LongType

HandleUnknownStrategies = Literal["error", "use_default_value", "drop"]

Expand All @@ -39,6 +36,10 @@ class LabelEncoderTransformWarning(Warning):
"""Label encoder transform warning."""


class LabelEncoderPartialFitWarning(Warning):
"""Label encoder partial fit warning."""


class BaseLabelEncodingRule(abc.ABC): # pragma: no cover
"""
Interface of the label encoding rule
Expand Down Expand Up @@ -169,22 +170,19 @@ def _make_inverse_mapping_list(self) -> List:
return inverse_mapping_list

def _fit_spark(self, df: SparkDataFrame) -> None:
unique_col_values = df.select(self._col).distinct().persist(StorageLevel.MEMORY_ONLY)
unique_col_values = df.select(self._col).distinct()
window_function_give_ids = Window.orderBy(self._col)

mapping_on_spark = (
unique_col_values.rdd.zipWithIndex()
.toDF(
StructType()
.add("_1", StructType().add(self._col, df.schema[self._col].dataType, True), True)
.add("_2", LongType(), True)
unique_col_values.withColumn(
self._target_col,
sf.row_number().over(window_function_give_ids).cast(LongType()),
)
.select(sf.col(f"_1.{self._col}").alias(self._col), sf.col("_2").alias(self._target_col))
.persist(StorageLevel.MEMORY_ONLY)
.withColumn(self._target_col, sf.col(self._target_col) - 1)
.select(self._col, self._target_col)
)

self._mapping = mapping_on_spark.rdd.collectAsMap()
mapping_on_spark.unpersist()
unique_col_values.unpersist()

def _fit_pandas(self, df: PandasDataFrame) -> None:
unique_col_values = df[self._col].drop_duplicates().reset_index(drop=True)
Expand Down Expand Up @@ -228,34 +226,43 @@ def fit(self, df: DataFrameLike) -> "LabelEncodingRule":

def _partial_fit_spark(self, df: SparkDataFrame) -> None:
assert self._mapping is not None

max_value = sf.lit(max(self._mapping.values()) + 1)
already_fitted = list(self._mapping.keys())
new_values = {x[self._col] for x in df.select(self._col).distinct().collect()} - set(already_fitted)
new_values_list = [[x] for x in new_values]
new_values_df: SparkDataFrame = get_spark_session().createDataFrame(new_values_list, schema=[self._col])
new_unique_values = new_values_df.join(df, on=self._col, how="left").select(self._col)

new_data: dict = (
new_unique_values.rdd.zipWithIndex()
.toDF(
StructType()
.add("_1", StructType().add(self._col, df.schema[self._col].dataType), True)
.add("_2", LongType(), True)
if len(new_values_list) == 0:
warnings.warn(
"partial_fit will have no effect because "
f"there are no new values in the incoming dataset at '{self.column}' column",
LabelEncoderPartialFitWarning,
)
.select(sf.col(f"_1.{self._col}").alias(self._col), sf.col("_2").alias(self._target_col))
.withColumn(self._target_col, sf.col(self._target_col) + max_value)
return
new_unique_values_df: SparkDataFrame = get_spark_session().createDataFrame(new_values_list, schema=[self._col])
window_function_give_ids = Window.orderBy(self._col)
new_part_of_mapping = (
new_unique_values_df.withColumn(
self._target_col,
sf.row_number().over(window_function_give_ids).cast(LongType()),
)
.withColumn(self._target_col, sf.col(self._target_col) - 1 + max_value)
.select(self._col, self._target_col)
.rdd.collectAsMap()
)
self._mapping.update(new_data)
self._inverse_mapping.update({v: k for k, v in new_data.items()})
self._inverse_mapping_list.extend(new_data.keys())
new_unique_values.unpersist()
self._mapping.update(new_part_of_mapping)
self._inverse_mapping.update({v: k for k, v in new_part_of_mapping.items()})
self._inverse_mapping_list.extend(new_part_of_mapping.keys())

def _partial_fit_pandas(self, df: PandasDataFrame) -> None:
assert self._mapping is not None

new_unique_values = set(df[self._col].tolist()) - set(self._mapping)
if len(new_unique_values) == 0:
warnings.warn(
"partial_fit will have no effect because "
f"there are no new values in the incoming dataset at '{self.column}' column",
LabelEncoderPartialFitWarning,
)
return
last_mapping_value = max(self._mapping.values())
new_data: dict = {value: last_mapping_value + i for i, value in enumerate(new_unique_values, start=1)}
self._mapping.update(new_data)
Expand All @@ -266,6 +273,13 @@ def _partial_fit_polars(self, df: PolarsDataFrame) -> None:
assert self._mapping is not None

new_unique_values = set(df.select(self._col).unique().to_series().to_list()) - set(self._mapping)
if len(new_unique_values) == 0:
warnings.warn(
"partial_fit will have no effect because "
f"there are no new values in the incoming dataset at '{self.column}' column",
LabelEncoderPartialFitWarning,
)
return
new_data: dict = {value: max(self._mapping.values()) + i for i, value in enumerate(new_unique_values, start=1)}
self._mapping.update(new_data)
self._inverse_mapping.update({v: k for k, v in new_data.items()})
Expand Down
35 changes: 35 additions & 0 deletions tests/preprocessing/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import random

import numpy as np
import pandas as pd
import polars as pl
Expand Down Expand Up @@ -749,6 +751,39 @@ def simple_dataframe(spark, columns):
return spark.createDataFrame(data, schema=columns)


@pytest.fixture(scope="module")
def static_string_pd_df():
data = []
for _ in range(5000):
data.append(["Moscow"])
data.append(["Novgorod"])
return pd.DataFrame(data, columns=["random_string"])


@pytest.fixture(scope="module")
def static_string_spark_df(
spark,
static_string_pd_df,
):
return spark.createDataFrame(static_string_pd_df, schema=list(static_string_pd_df.columns))


@pytest.fixture(scope="module")
def static_string_pl_df(static_string_pd_df):
return pl.from_pandas(static_string_pd_df)


@pytest.fixture(scope="module")
def random_string_spark_df(spark):
random.seed(42)

def generate_random_string(length=10):
return "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=random.randint(1, length)))

data = [(generate_random_string(),) for _ in range(100_000)] * 4
return spark.createDataFrame(data, schema=["random_string"])


@pytest.fixture(scope="module")
def simple_dataframe_array(spark):
columns_array = ["user_id", "item_id", "timestamp"]
Expand Down
114 changes: 113 additions & 1 deletion tests/preprocessing/test_label_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pandas as pd
import pytest

from replay.preprocessing import LabelEncoder, LabelEncodingRule, SequenceEncodingRule
from replay.preprocessing import LabelEncoder, LabelEncoderPartialFitWarning, LabelEncodingRule, SequenceEncodingRule
from replay.utils import PYSPARK_AVAILABLE, PandasDataFrame, PolarsDataFrame
from tests.utils import sparkDataFrameEqual

Expand Down Expand Up @@ -64,6 +64,118 @@ def test_label_encoder_load_rule_spark(column, df_name, is_grouped_encoder, requ
pd.testing.assert_frame_equal(df1, df2)


@pytest.mark.spark
@pytest.mark.parametrize("column", ["random_string"])
def test_label_encoder_determinism(column, random_string_spark_df):
# When the dataframe is being repartitioned, the label encoder, trained through spark, outputs different mappings.
df1 = random_string_spark_df.repartition(13)
rule_1 = LabelEncodingRule(column)
encoder_1 = LabelEncoder([rule_1])
encoder_1.fit(df1)
mapping_1 = encoder_1.mapping[column]

df2 = random_string_spark_df.repartition(11)
rule_2 = LabelEncodingRule(column)
encoder_2 = LabelEncoder([rule_2])
encoder_2.fit(df2)
mapping_2 = encoder_2.mapping[column]

df3 = random_string_spark_df.repartition(20)
rule_3 = LabelEncodingRule(column)
encoder_3 = LabelEncoder([rule_3])
encoder_3.fit(df3)
mapping_3 = encoder_3.mapping[column]

assert mapping_1 == mapping_2, "LabelEncoder.fit works non-deterministically (comparison at launch 1 and 2)"
assert mapping_1 == mapping_3, "LabelEncoder.fit works non-deterministically (comparison at launch 1 and 3)"
assert mapping_2 == mapping_3, "LabelEncoder.fit works non-deterministically (comparison at launch 2 and 3)"


@pytest.mark.spark
@pytest.mark.parametrize("column", ["random_string"])
def test_label_encoder_partial_fit_determinism(column, random_string_spark_df, static_string_spark_df):
# When the dataframe is being repartitioned, the label encoder, trained through spark, outputs different mappings.
df1 = random_string_spark_df.repartition(13)
rule_1 = LabelEncodingRule(column)
encoder_1 = LabelEncoder([rule_1])
encoder_1.fit(static_string_spark_df)
encoder_1.partial_fit(df1)
mapping_1 = encoder_1.mapping[column]

df2 = random_string_spark_df.repartition(11)
rule_2 = LabelEncodingRule(column)
encoder_2 = LabelEncoder([rule_2])
encoder_2.fit(static_string_spark_df)
encoder_2.partial_fit(df2)
mapping_2 = encoder_2.mapping[column]

df3 = random_string_spark_df.repartition(20)
rule_3 = LabelEncodingRule(column)
encoder_3 = LabelEncoder([rule_3])
encoder_3.fit(static_string_spark_df)
encoder_3.partial_fit(df3)
mapping_3 = encoder_3.mapping[column]

assert mapping_1 == mapping_2, "LabelEncoder.fit works non-deterministically (comparison at launch 1 and 2)"
assert mapping_1 == mapping_3, "LabelEncoder.fit works non-deterministically (comparison at launch 1 and 3)"
assert mapping_2 == mapping_3, "LabelEncoder.fit works non-deterministically (comparison at launch 2 and 3)"


@pytest.mark.spark
@pytest.mark.parametrize("column", ["random_string"])
def test_label_encoder_mapping_keys_is_sequence(column, random_string_spark_df, static_string_spark_df):
df = random_string_spark_df.repartition(13)
rule = LabelEncodingRule(column)
encoder = LabelEncoder([rule])
encoder.fit(static_string_spark_df)
encoder.partial_fit(df)
count_of_elements = static_string_spark_df.distinct().count() + df.distinct().count()
mapping = encoder.mapping[column]
assert list(mapping.values()) == list(range(count_of_elements)), "encoded IDs of elements is not sequence"


@pytest.mark.parametrize(
"df_name",
[
pytest.param("simple_dataframe_pandas", marks=pytest.mark.core),
pytest.param("simple_dataframe_polars", marks=pytest.mark.core),
pytest.param("simple_dataframe", marks=pytest.mark.spark),
],
)
def test_label_encoder_on_many_columns(df_name, request):
df = request.getfixturevalue(df_name)
rules = [LabelEncodingRule(column) for column in df.columns]
encoder = LabelEncoder(rules)
encoder.fit(df)
assert len(encoder.mapping) == len(df.columns), "Not all columns are calculated"
assert all(len(values) for values in encoder.mapping.values()), "Some columns are without mappings after fit"


@pytest.mark.parametrize(
"column, df_name",
[
pytest.param("random_string", "static_string_pd_df", marks=pytest.mark.core),
pytest.param("random_string", "static_string_pl_df", marks=pytest.mark.core),
pytest.param("random_string", "static_string_spark_df", marks=pytest.mark.spark),
],
)
def test_label_encoder_partial_fit_no_new_values_at_input(column, df_name, request):
df = request.getfixturevalue(df_name)
rule = LabelEncodingRule(column)
encoder = LabelEncoder([rule])
encoder.fit(df)
mapping_before_partial_fit = encoder.mapping[column]
with pytest.warns(LabelEncoderPartialFitWarning):
encoder.partial_fit(df)
mapping_after_partial_fit = encoder.mapping[column]
assert len(mapping_before_partial_fit) == len(mapping_after_partial_fit), "count of elements in mappings not equal"
assert mapping_after_partial_fit == mapping_after_partial_fit, "mappings' keys are not equal"

assert list(mapping_before_partial_fit.values()) == list(
mapping_after_partial_fit.values()
), "mappings' values are not equal"


@pytest.mark.core
@pytest.mark.parametrize(
"column, df_name, is_grouped_encoder",
Expand Down

0 comments on commit eac7c6b

Please sign in to comment.