Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/gloo spark lightfm wrapper 0.4 #49

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
bd1a4ec
Add ANN interface and implementations
netang Mar 16, 2023
8eeeff8
Add dependencies
netang Mar 16, 2023
4ee8e58
Format code via black code formatter. Add doctest examples.
netang Mar 23, 2023
724a149
Add docstrings to ANNMixin methods
netang Mar 24, 2023
2b25dfa
Add test for ANN models (except word2vec)
netang Mar 28, 2023
e2e1f2b
Merge remote-tracking branch 'sb-repo/main' into sb-main-ann
netang May 10, 2023
0b43d7b
Replace `functools.cached_property` with `cached_property.cached_prop…
netang May 10, 2023
23ef21d
Reformat docstring
netang May 10, 2023
7da34f2
Disable/Fix pylint warns. Add docstrings.
netang May 11, 2023
eb4329d
Fix pycodestyle warn.
netang May 11, 2023
f8302f5
Move `HnswlibIndexFileManager` and `NmslibIndexFileManager` to `repla…
netang May 16, 2023
e7caad0
Add `BaseHnswParam`. Fix sphinx warn.
netang May 16, 2023
c59c16a
Remove commented lines.
netang May 16, 2023
48d0b78
Add `DriverHnswlibIndexBuilder` and `ExecutorHnswlibIndexBuilder`
netang May 16, 2023
461cb7d
Add `DriverNmslibIndexBuilder` and `ExecutorNmslibIndexBuilder`
netang May 16, 2023
77787a1
Move `NeighbourRec` to `base_neighbour_rec.py`. Move ann mixins to an…
netang May 16, 2023
14cef4c
Add ANN to `ADMMSLIM` and `AssociationRulesItemRec` models
netang May 16, 2023
4020e56
Disable pylint R0902
netang May 16, 2023
1d3c8d3
Replace `typing.Literal` with `typing_extensions.Literal`
netang May 16, 2023
daaba5d
Fix tests
netang May 16, 2023
516bfd8
Make `HnswlibMixin` and `NmslibHnswMixin` abstract
netang May 17, 2023
3c48caf
Fix saving/loading
netang May 19, 2023
31bfd1a
Remove duplicated code
netang May 19, 2023
3a50457
Update index builders, add index stores and add index inferers.
netang May 21, 2023
8dd8fc0
Fix index saving/loading
netang May 28, 2023
2ded076
Add clean upping index files
netang May 28, 2023
f8c2959
Add tests of save/load ANN models
netang May 28, 2023
83f236a
Fix pylint warns
netang May 28, 2023
33fec4b
Fix pytest error
netang May 28, 2023
de9e622
Add new save/load tests. Fix error in test.
netang May 30, 2023
a44575a
Set poetry-core version interval
netang May 30, 2023
b9f0130
Revert "Set poetry-core version interval"
netang May 30, 2023
f7d42dc
Add `poetry-core` to dependencies
netang May 30, 2023
e763d43
Add tests
netang May 30, 2023
c6d8d80
Fix pycodestyle warn
netang May 30, 2023
281c816
Add `.coveragerc`
netang May 30, 2023
244ef18
Added LightFM wrapper for distributed training
zakharova-anastasiia May 31, 2023
2fdbacf
Merge branch 'sb-main-ann' into feature/gloo-spark-lightfm-wrapper-0.4
zakharova-anastasiia May 31, 2023
b510b64
Minor style changes
zakharova-anastasiia May 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
omit = replay/ann/index_stores/hdfs_index_store.py
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ jobs:
- name: pytest
run: |
. ./venv/bin/activate
pytest --cov=replay --cov-report=term-missing --doctest-modules replay --cov-fail-under=93 tests
pytest --cov-config=.coveragerc --cov=replay --cov-report=term-missing --doctest-modules replay --cov-fail-under=93 tests
1,578 changes: 810 additions & 768 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ packages = [

[tool.poetry.dependencies]
python = ">=3.7, <3.10"
poetry-core = "1.6.0"
lightfm = "*"
lightautoml = ">=0.3.1, <0.3.7"
numpy = ">=1.20.0"
Expand All @@ -46,6 +47,11 @@ seaborn = "*"
pyarrow = "*"
implicit = ">=0.5"
pytorch-ranger = "^0.1.1"
nmslib = "*"
hnswlib = "*"
cached-property = "*"
# extra packages
pygloo-rec = { version = "*", optional = true }

[tool.poetry.dev-dependencies]
# dev only
Expand Down Expand Up @@ -77,6 +83,9 @@ virtualenv = "*"
data-science-types = "*"
pyspark-stubs = "*"

[tool.poetry.extras]
distributed-lightfm = ["pygloo-rec"]

[tool.black]
line-length = 79

Expand Down
Empty file added replay/ann/__init__.py
Empty file.
202 changes: 202 additions & 0 deletions replay/ann/ann_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import importlib
import logging
from abc import abstractmethod
from typing import Optional, Dict, Any

from pyspark.sql import DataFrame
from pyspark.sql import functions as sf

from replay.ann.index_builders.base_index_builder import IndexBuilder
from replay.ann.index_stores.spark_files_index_store import (
SparkFilesIndexStore,
)
from replay.models.base_rec import BaseRecommender

logger = logging.getLogger("replay")


class ANNMixin(BaseRecommender):
"""
This class overrides the `_fit_wrap` and `_inner_predict_wrap` methods of the base class,
adding an index construction in the `_fit_wrap` step
and an index inference in the `_inner_predict_wrap` step.
"""

index_builder: Optional[IndexBuilder] = None

@property
def _use_ann(self) -> bool:
"""
Property that determines whether the ANN (index) is used.
If `True`, then the index will be built (at the `fit` stage)
and index will be inferred (at the `predict` stage).
"""
return self.index_builder is not None

@abstractmethod
def _get_vectors_to_build_ann(self, log: DataFrame) -> DataFrame:
"""Implementations of this method must return a dataframe with item vectors.
Item vectors from this method are used to build the index.

Args:
log: DataFrame with interactions

Returns: DataFrame[item_idx int, vector array<double>] or DataFrame[vector array<double>].
Column names in dataframe can be anything.
"""

@abstractmethod
def _get_ann_build_params(self, log: DataFrame) -> Dict[str, Any]:
"""Implementation of this method must return dictionary
with arguments for `_build_ann_index` method.

Args:
log: DataFrame with interactions

Returns: Dictionary with arguments to build index. For example: {
"id_col": "item_idx",
"features_col": "item_factors",
...
}

"""

def _fit_wrap(
self,
log: DataFrame,
user_features: Optional[DataFrame] = None,
item_features: Optional[DataFrame] = None,
) -> None:
"""Wrapper extends `_fit_wrap`, adds construction of ANN index by flag.

Args:
log: historical log of interactions
``[user_idx, item_idx, timestamp, relevance]``
user_features: user features
``[user_idx, timestamp]`` + feature columns
item_features: item features
``[item_idx, timestamp]`` + feature columns

"""
super()._fit_wrap(log, user_features, item_features)

if self._use_ann:
vectors = self._get_vectors_to_build_ann(log)
ann_params = self._get_ann_build_params(log)
self.index_builder.build_index(vectors, **ann_params)

@abstractmethod
def _get_vectors_to_infer_ann_inner(
self, log: DataFrame, users: DataFrame
) -> DataFrame:
"""Implementations of this method must return a dataframe with user vectors.
User vectors from this method are used to infer the index.

Args:
log: DataFrame with interactions
users: DataFrame with users

Returns: DataFrame[user_idx int, vector array<double>] or DataFrame[vector array<double>].
Vector column name in dataframe can be anything.
"""

def _get_vectors_to_infer_ann(
self, log: DataFrame, users: DataFrame, filter_seen_items: bool
) -> DataFrame:
"""This method wraps `_get_vectors_to_infer_ann_inner`
and adds seen items to dataframe with user vectors by flag.

Args:
log: DataFrame with interactions
users: DataFrame with users
filter_seen_items: flag to remove seen items from recommendations based on ``log``.

Returns:

"""
users = self._get_vectors_to_infer_ann_inner(log, users)

# here we add `seen_item_idxs` to filter the viewed items in UDFs (see infer_index_udf)
if filter_seen_items:
user_to_max_items = log.groupBy("user_idx").agg(
sf.count("item_idx").alias("num_items"),
sf.collect_set("item_idx").alias("seen_item_idxs"),
)
users = users.join(user_to_max_items, on="user_idx")

return users

@abstractmethod
def _get_ann_infer_params(self) -> Dict[str, Any]:
"""Implementation of this method must return dictionary
with arguments for `_infer_ann_index` method.

Returns: Dictionary with arguments to infer index. For example: {
"features_col": "user_vector",
...
}

"""

def _inner_predict_wrap( # pylint: disable=too-many-arguments
self,
log: DataFrame,
k: int,
users: DataFrame,
items: DataFrame,
user_features: Optional[DataFrame] = None,
item_features: Optional[DataFrame] = None,
filter_seen_items: bool = True,
) -> DataFrame:
"""Override base `_inner_predict_wrap` and adds ANN inference by condition"""
if self._use_ann:
vectors = self._get_vectors_to_infer_ann(
log, users, filter_seen_items
)
ann_params = self._get_ann_infer_params()
inferer = self.index_builder.produce_inferer(filter_seen_items)
return inferer.infer(vectors, ann_params["features_col"], k)
else:
return self._predict(
log,
k,
users,
items,
user_features,
item_features,
filter_seen_items,
)

def _filter_seen(
self, recs: DataFrame, log: DataFrame, k: int, users: DataFrame
):
"""
Overridden _filter_seen method from base class.
Filtering is not needed for ann methods, because the data is already filtered in udf.
"""
if self._use_ann:
return recs

return super()._filter_seen(recs, log, k, users)

def _save_index(self, path):
self.index_builder.index_store.dump_index(path)

def _load_index(self, path: str):
self.index_builder.index_store = SparkFilesIndexStore()
self.index_builder.index_store.load_from_path(path)

def init_builder_from_dict(self, init_meta: dict):
"""Inits an index builder instance from a dict with init meta."""

# index param entity instance initialization
module = importlib.import_module(init_meta["index_param"]["module"])
class_ = getattr(module, init_meta["index_param"]["class"])
index_params = class_(**init_meta["index_param"]["init_args"])

# index builder instance initialization
module = importlib.import_module(init_meta["builder"]["module"])
class_ = getattr(module, init_meta["builder"]["class"])
index_builder = class_(index_params=index_params, index_store=None)

self.index_builder = index_builder
Empty file added replay/ann/entities/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions replay/ann/entities/base_hnsw_param.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class BaseHnswParam:
"""
Base hnsw params.
"""

space: str
m: int = 200 # pylint: disable=invalid-name
ef_c: int = 20000
post: int = 0
ef_s: Optional[int] = None

def init_meta_as_dict(self) -> dict:
"""
Returns meta-information for class instance initialization. Used to save the entity to disk.
:return: dictionary with init meta.
"""
return {
"module": type(self).__module__,
"class": type(self).__name__,
"init_args": {
"space": self.space,
"m": self.m,
"ef_c": self.ef_c,
"post": self.post,
"ef_s": self.ef_s,
},
}
66 changes: 66 additions & 0 deletions replay/ann/entities/hnswlib_param.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from dataclasses import dataclass, field
from typing_extensions import Literal

from replay.ann.entities.base_hnsw_param import BaseHnswParam


@dataclass
class HnswlibParam(BaseHnswParam):
"""
Parameters for hnswlib methods.

For example,

>>> HnswlibParam(space="ip",\
m=100,\
ef_c=200,\
post=0,\
ef_s=2000,\
)
HnswlibParam(space='ip', m=100, ef_c=200, post=0, ef_s=2000, dim=None, max_elements=None)

or

>>> HnswlibParam(space="ip",\
m=100,\
ef_c=200,\
post=0,\
ef_s=2000,\
)
HnswlibParam(space='ip', m=100, ef_c=200, post=0, ef_s=2000, dim=None, max_elements=None)

The "space" parameter described on the page https://github.com/nmslib/hnswlib/blob/master/README.md#supported-distances
Parameters "m", "ef_s" and "ef_c" are described at https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md

The reasonable range of values for `m` parameter is 5-100,
for `ef_c` and `ef_s` is 100-2000.
Increasing these values improves the prediction quality
but increases index_time and inference_time too.

We recommend using these settings:

- m=16, ef_c=200 and ef_s=200 for simple datasets like MovieLens.
- m=50, ef_c=1000 and ef_s=1000 for average quality with an average prediction time.
- m=75, ef_c=2000 and ef_s=2000 for the highest quality with a long prediction time.

note: choosing these parameters depends on the dataset
and quality/time tradeoff.

note: while reducing parameter values the highest range metrics
like Metric@1000 suffer first.

note: even in a case with a long training time,
profit from ann could be obtained while inference will be used multiple times.
"""

space: Literal["l2", "ip", "cosine"] = "ip"
# Dimension of vectors in index
dim: int = field(default=None, init=False)
# Max number of elements that will be stored in the index
max_elements: int = field(default=None, init=False)

# def init_args_as_dict(self):
# # union dicts
# return dict(
# super().init_args_as_dict()["init_args"], **{"space": self.space}
# )
Loading