diff --git a/examples/02_models_comparison.ipynb b/examples/02_models_comparison.ipynb index be13ff04a..948fb02e2 100644 --- a/examples/02_models_comparison.ipynb +++ b/examples/02_models_comparison.ipynb @@ -42,6 +42,7 @@ "end_time": "2020-02-10T16:01:45.639135Z", "start_time": "2020-02-10T16:01:45.612577Z" }, + "collapsed": false, "jupyter": { "outputs_hidden": false }, @@ -157,6 +158,7 @@ "end_time": "2020-02-10T15:59:09.227179Z", "start_time": "2020-02-10T15:59:06.427348Z" }, + "collapsed": false, "jupyter": { "outputs_hidden": false }, @@ -222,6 +224,7 @@ "end_time": "2020-02-10T15:59:42.041251Z", "start_time": "2020-02-10T15:59:09.230636Z" }, + "collapsed": false, "jupyter": { "outputs_hidden": false }, @@ -1813,7 +1816,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3.9.6 ('.venv': venv)", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -1827,7 +1830,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.6" + "version": "3.9.7" }, "name": "movielens_nmf.ipynb", "pycharm": { diff --git a/examples/09_neural_ts_exp.ipynb b/examples/09_neural_ts_exp.ipynb new file mode 100644 index 000000000..8d0b538a5 --- /dev/null +++ b/examples/09_neural_ts_exp.ipynb @@ -0,0 +1,920 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b9ec062e-7d66-4c53-a480-bc6639d38e55", + "metadata": {}, + "source": [ + "We will show the main RePlay functionality and compare performance of RePlay models on well-known MovieLens dataset. For simplicity we consider here only the variou bandit algorithms with context. The list of considered strategies for comparison:\n", + "\n", + "1) Linear UCB\n", + "\n", + "2) Linear Thompson Sampling\n", + "\n", + "3) Logistic Thompson Sampling\n", + "\n", + "### Dataset\n", + "\n", + "We will compare RePlay models on MovieLens 1m.\n", + "\n", + "### Dataset preprocessing:\n", + "\n", + "Ratings greater than or equal to 3 are considered as positive interactions.\n", + "\n", + "### Data split\n", + "\n", + "Dataset is split by date so that 20% of the last interactions as are placed in the test part. Cold items and users are dropped.\n", + "\n", + "### Predict:\n", + "We will predict top-10 most relevant films for each user.\n", + "\n", + "### Metrics\n", + "\n", + "Quality metrics used: ndcg@k, hitrate@k, map@k, mrr@k for k = 1, 5, 10 Additional metrics used: coverage@k and surprisal@k.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "518b0d1a-a574-413b-928f-5755c763c405", + "metadata": {}, + "outputs": [], + "source": [ + "! pip install rs-datasets" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7dd87a23-af97-44ca-b3f4-db88eb6bfb35", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "42bf7dd6-e79f-4d73-a6be-f8df5d75d6d2", + "metadata": {}, + "outputs": [], + "source": [ + "%config Completer.use_jedi = False" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1d9abd75-4180-4a67-9b07-eddac41ebbc1", + "metadata": {}, + "outputs": [], + "source": [ + "import warnings\n", + "from optuna.exceptions import ExperimentalWarning\n", + "warnings.filterwarnings(\"ignore\", category=UserWarning)\n", + "warnings.filterwarnings(\"ignore\", category=ExperimentalWarning)\n", + "warnings.simplefilter(action='ignore', category=FutureWarning)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c9cdf0fa-0e15-4a5f-966d-d65e3eac7f2e", + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import time\n", + "import pandas as pd\n", + "import numpy as np\n", + "\n", + "from pyspark.sql import functions as sf, types as st\n", + "from pyspark.sql.types import IntegerType\n", + "\n", + "from replay.experimental.preprocessing.data_preparator import Indexer, DataPreparator\n", + "from replay.metrics import Coverage, HitRate, MRR, MAP, NDCG, Surprisal, Experiment\n", + "from replay.experimental.models import (\n", + " NeuralTS\n", + ")\n", + "\n", + "\n", + "from replay.utils.session_handler import State\n", + "from replay.splitters import TimeSplitter\n", + "from replay.utils.spark_utils import get_log_info\n", + "from rs_datasets import MovieLens" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9490b442-0f13-4106-ab81-c0c8c0e14ffb", + "metadata": {}, + "outputs": [], + "source": [ + "spark = State().session\n", + "spark" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "176cba50-02f2-4a0b-90da-31d6a4ca3be0", + "metadata": {}, + "outputs": [], + "source": [ + "spark.sparkContext.setLogLevel('ERROR')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "de6cfeb3-e9a8-40f0-a86a-29c3351c19e4", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "logger = logging.getLogger(\"replay\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e2e59462-943b-49e4-ad29-47c68655db10", + "metadata": {}, + "outputs": [], + "source": [ + "K = 10\n", + "K_list_metrics = [1, 5, 10]\n", + "BUDGET = 20\n", + "BUDGET_NN = 10\n", + "SEED = 12345" + ] + }, + { + "cell_type": "markdown", + "id": "a60db72f-6060-4727-9916-ffe170777cb6", + "metadata": {}, + "source": [ + "## Preprocessing " + ] + }, + { + "cell_type": "markdown", + "id": "6c4d61c4-3f36-4422-b4ea-8938fe53bcb3", + "metadata": {}, + "source": [ + "### Data loading" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "afb72229-1f12-412e-bb6d-65e49f2acb5e", + "metadata": {}, + "outputs": [], + "source": [ + "data = MovieLens(\"1m\")\n", + "data.info()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "799925a2-9b6d-485e-b373-a7118a32fbf7", + "metadata": {}, + "outputs": [], + "source": [ + "data.ratings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "367862be-9ccd-4ae9-847b-6cd0ce4b82df", + "metadata": {}, + "outputs": [], + "source": [ + "preparator = DataPreparator()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ba5deeec-7890-4273-b77f-2733c2c1c1b0", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "log = preparator.transform(columns_mapping={'user_id': 'user_id',\n", + " 'item_id': 'item_id',\n", + " 'relevance': 'rating',\n", + " 'timestamp': 'timestamp'\n", + " }, \n", + " data=data.ratings)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1f43241-03d3-44b6-9208-c9e40523d91f", + "metadata": {}, + "outputs": [], + "source": [ + "# will consider ratings >= 3 as positive feedback. A positive feedback is treated with relevance = 1\n", + "only_positives_log = log.filter(sf.col('relevance') >= 3).withColumn('relevance', sf.lit(1))\n", + "only_positives_log.count()" + ] + }, + { + "cell_type": "markdown", + "id": "3007ddfc-1b7b-45d6-b550-dc628b24d176", + "metadata": {}, + "source": [ + "### Indexing " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "102618c4-c2d1-4083-aef4-3329abeb798a", + "metadata": {}, + "outputs": [], + "source": [ + "indexer = Indexer(user_col='user_id', item_col='item_id')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "35619648-b44b-4b53-8809-0cc20e1ee7f9", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "indexer.fit(users=log.select('user_id'),\n", + " items=log.select('item_id'))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a040889-0bff-476d-af92-3af77b323cc0", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "log_replay = indexer.transform(df=only_positives_log)\n", + "log_replay.show(2)\n", + "log_replay.count()" + ] + }, + { + "cell_type": "markdown", + "id": "9a8d487f-8a46-4470-9006-01c24ffc7b87", + "metadata": {}, + "source": [ + "### Data split" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "90514332-4396-4b0f-a497-e28b679a9e31", + "metadata": {}, + "outputs": [], + "source": [ + "# train/test split \n", + "train_spl = TimeSplitter(\n", + " time_threshold=0.2,\n", + " drop_cold_items=True,\n", + " drop_cold_users=True,\n", + " query_column=\"user_idx\",\n", + " item_column=\"item_idx\",\n", + ")\n", + "\n", + "train, test = train_spl.split(log_replay)\n", + "print('train info:\\n', get_log_info(train))\n", + "print('test info:\\n', get_log_info(test))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bbef1ae0-df6f-42e8-8679-63ef36458dd6", + "metadata": {}, + "outputs": [], + "source": [ + "train.is_cached" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "843be418-7120-41d8-85a4-c4587872224f", + "metadata": {}, + "outputs": [], + "source": [ + "# train/test split for hyperparameters selection\n", + "opt_train, opt_val = train_spl.split(train)\n", + "opt_train.count(), opt_val.count()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "86661982-34f3-4c5a-bf4c-a543837f4d40", + "metadata": {}, + "outputs": [], + "source": [ + "opt_train.is_cached" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4502490b-59cb-4896-8b0f-e8985c6ad32c", + "metadata": {}, + "outputs": [], + "source": [ + "# negative feedback will be used for Wilson and UCB models\n", + "only_negatives_log = indexer.transform(df=log.filter(sf.col('relevance') < 3).withColumn('relevance', sf.lit(0.)))\n", + "test_start = test.agg(sf.min('timestamp')).collect()[0][0]\n", + "\n", + "# train with both positive and negative feedback\n", + "pos_neg_train=(train\n", + " .withColumn('relevance', sf.lit(1.))\n", + " .union(only_negatives_log.filter(sf.col('timestamp') < test_start))\n", + " )\n", + "pos_neg_train.cache()\n", + "pos_neg_train.count()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "914f8905-b6e3-411e-b468-2932e8af97db", + "metadata": {}, + "outputs": [], + "source": [ + "pos_neg_train.is_cached" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c8ec5c4d-4302-49e6-8a97-0607a677cf13", + "metadata": {}, + "outputs": [], + "source": [ + "pos_neg_train.groupBy('relevance').count().show()" + ] + }, + { + "cell_type": "markdown", + "id": "e0d0faa9-35bf-4f39-baac-ed6ce1975e59", + "metadata": {}, + "source": [ + "### Item features " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8a2bb8e3-838c-462d-8837-605beab5576d", + "metadata": {}, + "outputs": [], + "source": [ + "item_features_original = preparator.transform(columns_mapping={'item_id': 'item_id'}, \n", + " data=data.items)\n", + "item_features = indexer.transform(df=item_features_original)\n", + "item_features.show(2)\n", + "#different item features\n", + "\n", + "from pyspark.sql.functions import max,min\n", + "item_features.select(max(item_features.item_idx)).show()\n", + "item_features.select(min(item_features.item_idx)).show()\n", + "#just to check that the indexing is dense between 0 and 3882\n", + "item_features.count()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ef866ef5-5a58-435f-bd1a-98b1a84f791c", + "metadata": {}, + "outputs": [], + "source": [ + "year = item_features.withColumn('year', sf.substring(sf.col('title'), -5, 4).astype(st.IntegerType())).select('item_idx', 'year')\n", + "year.show(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "584986c0-ca89-4340-9cb2-c925d2e8b69d", + "metadata": {}, + "outputs": [], + "source": [ + "genres = (\n", + " item_features.select(\n", + " \"item_idx\",\n", + " sf.split(\"genres\", \"\\|\").alias(\"genres\")\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "90f241fb-18e9-4824-b681-944bfb9d3173", + "metadata": {}, + "outputs": [], + "source": [ + "genres_list = (\n", + " genres.select(sf.explode(\"genres\").alias(\"genre\"))\n", + " .distinct().filter('genre <> \"(no genres listed)\"')\n", + " .toPandas()[\"genre\"].tolist()\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cc361d69-2b2f-4e27-b477-a935b9695b31", + "metadata": {}, + "outputs": [], + "source": [ + "item_features = genres\n", + "for genre in genres_list:\n", + " item_features = item_features.withColumn(\n", + " genre,\n", + " sf.array_contains(sf.col(\"genres\"), genre).astype(IntegerType())\n", + " )\n", + "item_features = item_features.drop(\"genres\").cache()\n", + "item_features.count()\n", + "item_features = item_features.join(year, on='item_idx', how='inner')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "67b0e545-a8d1-4a53-8e08-122bd2dd2623", + "metadata": {}, + "outputs": [], + "source": [ + "item_features = item_features.withColumnRenamed(\"Children's\",\"Children\")\n", + "item_features = item_features.toPandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "659878de-012b-4ea0-89ce-84514dcf5bac", + "metadata": {}, + "outputs": [], + "source": [ + "item_features.columns" + ] + }, + { + "cell_type": "markdown", + "id": "a4eddc96-9055-4ac1-8c54-47b19239973a", + "metadata": {}, + "source": [ + "### Users features" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b7a2fddb-3f5d-49c1-8e15-e7d158678739", + "metadata": {}, + "outputs": [], + "source": [ + "data.users.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df210a6e-78ef-42d2-81ca-72edee25a4e3", + "metadata": {}, + "outputs": [], + "source": [ + "#same preprocessing for users as was done in 2.4.1.\n", + "user_features_original = preparator.transform(columns_mapping={'user_id': 'user_id'}, \n", + " data=data.users)\n", + "user_features = indexer.transform(df=user_features_original)\n", + "#switch for a while into pandas\n", + "user_features = user_features.toPandas()\n", + "user_features.head(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e90b8cd9-9f70-4bf4-8466-f833c62c28c6", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.functions import mean, stddev\n", + "user_train_mean = pos_neg_train.groupBy('user_idx').mean('relevance').withColumnRenamed('avg(relevance)','user_mean_rel').toPandas()\n", + "user_train_count = pos_neg_train.groupBy('user_idx').count().withColumnRenamed('count','user_count_rel').toPandas()\n", + "user_train_std = pos_neg_train.groupBy('user_idx').agg({'relevance': 'stddev'}).withColumnRenamed('stddev(relevance)','user_std_rel').toPandas()\n", + "user_train_median = pd.DataFrame(pos_neg_train.toPandas().groupby('user_idx', as_index = False)['relevance'].agg('median')).rename(columns={'relevance': 'user_median_rel'})\n", + "user_train_kurt = pd.DataFrame(pos_neg_train.toPandas().groupby('user_idx', as_index = False)['relevance'].apply(pd.DataFrame.kurt)).rename(columns={'relevance': 'user_kurt_rel'})\n", + "user_train_skew = pd.DataFrame(pos_neg_train.toPandas().groupby('user_idx', as_index = False)['relevance'].apply(pd.DataFrame.skew)).rename(columns={'relevance': 'user_skew_rel'})\n", + "\n", + "item_train_mean = pos_neg_train.groupBy('item_idx').mean('relevance').withColumnRenamed('avg(relevance)','item_mean_rel').toPandas()\n", + "item_train_count = pos_neg_train.groupBy('item_idx').count().withColumnRenamed('count','item_count_rel').toPandas()\n", + "item_train_std = pos_neg_train.groupBy('item_idx').agg({'relevance': 'stddev'}).withColumnRenamed('stddev(relevance)','item_std_rel').toPandas()\n", + "item_train_median = pd.DataFrame(pos_neg_train.toPandas().groupby('item_idx', as_index = False)['relevance'].agg('median')).rename(columns={'relevance': 'item_median_rel'})\n", + "item_train_kurt = pd.DataFrame(pos_neg_train.toPandas().groupby('item_idx', as_index = False)['relevance'].apply(pd.DataFrame.kurt)).rename(columns={'relevance': 'item_kurt_rel'})\n", + "item_train_skew = pd.DataFrame(pos_neg_train.toPandas().groupby('item_idx', as_index = False)['relevance'].apply(pd.DataFrame.skew)).rename(columns={'relevance': 'item_skew_rel'})\n", + "\n", + "full_mean = pos_neg_train.select(mean('relevance')).toPandas().values.squeeze()\n", + "full_count = 0\n", + "full_std = pos_neg_train.agg({'relevance': 'stddev'}).toPandas().values.squeeze()\n", + "full_median = pos_neg_train.toPandas()['relevance'].median().squeeze()\n", + "full_kurt = pos_neg_train.toPandas()['relevance'].kurt().squeeze()\n", + "full_skew = pos_neg_train.toPandas()['relevance'].skew().squeeze()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "422cedfe-76d7-4064-a713-6514d4b5deaf", + "metadata": {}, + "outputs": [], + "source": [ + "user_train_sum = pos_neg_train.groupBy('user_idx').sum('relevance').withColumnRenamed('sum(relevance)','user_sum_rel').toPandas()\n", + "item_train_sum = pos_neg_train.groupBy('item_idx').sum('relevance').withColumnRenamed('sum(relevance)','item_sum_rel').toPandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aa549b76-ab84-4eb0-8e90-da49717868e5", + "metadata": {}, + "outputs": [], + "source": [ + "item_train_sum.head(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8b7f3532-4404-4b20-ab55-3a9bb54443be", + "metadata": {}, + "outputs": [], + "source": [ + "user_all_true = pd.DataFrame({'user_idx': user_train_sum['user_idx'].values , \n", + " 'user_all_true': (user_train_sum['user_sum_rel'] == user_train_count['user_count_rel'].values).astype(int)})\n", + "item_all_true = pd.DataFrame({'item_idx': item_train_sum['item_idx'].values , \n", + " 'item_all_true': (item_train_sum['item_sum_rel'] == item_train_count['item_count_rel'].values).astype(int)})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fae08c53-1ce3-4675-b2ab-86e91b437512", + "metadata": {}, + "outputs": [], + "source": [ + "pop_item = pd.DataFrame({'item_idx': item_train_mean['item_idx'].values , \n", + " 'item_is_pop': (item_train_mean['item_mean_rel'] > 0.9).astype(int)})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a8cf10e7-6897-4ce0-9c9b-1426c4e746e1", + "metadata": {}, + "outputs": [], + "source": [ + "from sklearn.preprocessing import OneHotEncoder\n", + "print(\"max ocupation index: \", user_features['occupation'].max())\n", + "print(\"min ocupation index: \", user_features['occupation'].min())\n", + "count_diff_zips = user_features['zip_code'].unique().size\n", + "print(\"different zip codes: \", count_diff_zips) \n", + "users_pd = user_features\n", + "users_pd['zip_code'] = users_pd['zip_code'].apply(lambda x: x[0])\n", + "\n", + "#binarize age variable\n", + "bins = [0, 20, 30, 40, 50, 60, np.inf]\n", + "names = ['<20', '20-29', '30-39','40-49', '51-60', '60+']\n", + "\n", + "users_pd['agegroup'] = pd.cut(users_pd['age'], bins, labels=names)\n", + "# users_pd = users_pd.drop([\"age\"], axis = 1)\n", + "# users_pd.head()\n", + "\n", + "# #binarize following https://github.com/kfoofw/bandit_simulations/tree/master\n", + "# columnsToEncode = [\"agegroup\",\"gender\",\"occupation\"]\n", + "# myEncoder = OneHotEncoder(sparse=False, handle_unknown='ignore')\n", + "# myEncoder.fit(users_pd[columnsToEncode])\n", + "\n", + "# users_pd = pd.concat([users_pd.drop(columnsToEncode, 1),\n", + "# pd.DataFrame(myEncoder.transform(users_pd[columnsToEncode]), \n", + "# columns = myEncoder.get_feature_names_out(columnsToEncode))], axis=1).reindex()\n", + "users_pd['gender_occupation'] = users_pd['gender'] + '_' + users_pd['occupation'].astype(str)\n", + "users_pd.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b669a3c4-6be1-438f-8db4-8b85003e79f5", + "metadata": {}, + "outputs": [], + "source": [ + "#stats\n", + "users_pd = users_pd.merge(user_train_mean, on='user_idx', how='left')\n", + "users_pd['user_mean_rel'].fillna(full_mean, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_count, on='user_idx', how='left')\n", + "users_pd['user_count_rel'].fillna(full_count, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_std, on='user_idx', how='left')\n", + "users_pd['user_std_rel'].fillna(full_std, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_median, on='user_idx', how='left')\n", + "users_pd['user_median_rel'].fillna(full_median, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_kurt, on='user_idx', how='left')\n", + "users_pd['user_kurt_rel'].fillna(full_kurt, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_skew, on='user_idx', how='left')\n", + "users_pd['user_skew_rel'].fillna(full_skew, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_all_true, on='user_idx', how='left')\n", + "users_pd['user_all_true'].fillna(0, inplace = True)\n", + "\n", + "\n", + "item_features = item_features.merge(item_train_mean, on='item_idx', how='left')\n", + "item_features['item_mean_rel'].fillna(full_mean, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_count, on='item_idx', how='left')\n", + "item_features['item_count_rel'].fillna(full_count, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_std, on='item_idx', how='left')\n", + "item_features['item_std_rel'].fillna(full_std, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_median, on='item_idx', how='left')\n", + "item_features['item_median_rel'].fillna(full_median, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_kurt, on='item_idx', how='left')\n", + "item_features['item_kurt_rel'].fillna(full_kurt, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_skew, on='item_idx', how='left')\n", + "item_features['item_skew_rel'].fillna(full_skew, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_all_true, on='item_idx', how='left')\n", + "item_features['item_all_true'].fillna(0, inplace = True)\n", + "\n", + "item_features = item_features.merge(pop_item, on='item_idx', how='left')\n", + "item_features['item_is_pop'].fillna(0, inplace = True)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7948ea64-1786-42f0-bc01-f0172d41fd63", + "metadata": {}, + "outputs": [], + "source": [ + "#make it pyspark\n", + "user_features = spark.createDataFrame(users_pd) \n", + "user_features.cache()\n", + "user_features.printSchema()\n", + "print(\"total users: \",user_features.count())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "005148e2-4bd9-4e4e-9bec-5622ca34be3a", + "metadata": {}, + "outputs": [], + "source": [ + "item_features = spark.createDataFrame(item_features) \n", + "item_features.cache()\n", + "item_features.printSchema()\n", + "print(\"total users: \",item_features.count())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "be6cf4d6-4ec2-40eb-a86d-4c82c7350069", + "metadata": {}, + "outputs": [], + "source": [ + "# item_features = indexer.transform(df=item_features)\n", + "# user_features = indexer.transform(df=user_features)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3e45a708-0d64-4e1f-85e2-d2b91404cb9f", + "metadata": {}, + "outputs": [], + "source": [ + "cat_embed_cols = [\"agegroup\", 'gender', 'occupation', \"zip_code\"]\n", + "continuous_cols = [\"Crime\", \"Sci-Fi\", \"Musical\", \n", + " \"Mystery\", \"Documentary\", \"Fantasy\", \"Children\", \n", + " \"Drama\", \"Horror\", \"Adventure\", \"Western\",\n", + " \"Romance\", \"War\", \"Animation\", \"Action\", \"Comedy\", \"Thriller\",\n", + " \"Film-Noir\"]\n", + " # , \"age\",\n", + " # 'user_mean_rel', 'user_count_rel', 'user_std_rel',\n", + " # 'user_median_rel', 'user_kurt_rel', 'user_skew_rel',\n", + " # 'item_mean_rel', 'item_count_rel', 'item_std_rel',\n", + " # 'item_median_rel', 'item_kurt_rel', 'item_skew_rel']\n", + "wide_cols = ['gender', 'occupation']\n", + "crossed_cols = ['gender_occupation']\n", + "\n", + "cols_item = {'continuous_cols':[], 'cat_embed_cols':[], 'wide_cols': []}\n", + "\n", + "cols_user = {'continuous_cols':[], 'cat_embed_cols':[], 'wide_cols': []}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8fc70e61-6e57-475b-a584-28f2143c978a", + "metadata": {}, + "outputs": [], + "source": [ + "e = Experiment(\n", + " [ MAP(K), \n", + " NDCG(K), \n", + " HitRate(K_list_metrics), \n", + " Coverage(K),\n", + " Surprisal(K),\n", + " MRR(K)],\n", + " test,\n", + " pos_neg_train,\n", + " query_column=\"user_idx\", item_column=\"item_idx\", rating_column=\"relevance\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1239639-00cd-4ab1-acdc-3c6d7b00870a", + "metadata": {}, + "outputs": [], + "source": [ + " bandit_models = {\n", + " 'Neural TS' : [NeuralTS(user_cols = cols_user,\n", + " item_cols = cols_item,\n", + " dim_head=20,\n", + " deep_out_dim=20,\n", + " hidden_layers=[32, 20],\n", + " embedding_sizes=[32, 32, 64],\n", + " wide_out_dim=1,\n", + " head_dropout=0.8,\n", + " deep_dropout=0.4,\n", + " n_epochs=2,\n", + " opt_lr = 3e-4,\n", + " lr_min = 1e-5,\n", + " use_gpu = False,\n", + " plot_dir='test.png',\n", + " is_warp_loss=True,\n", + " cnt_neg_samples=200,\n", + " cnt_samples_for_predict=10,\n", + " eps = +1.0\n", + " ), 'no_opt'] \n", + " }" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df0171a4-8c59-4c30-8034-23644dd0fe06", + "metadata": {}, + "outputs": [], + "source": [ + "def fit_predict_add_res(name, model, experiment, train, test, suffix=''):\n", + " \"\"\"\n", + " Run fit_predict for the `model`, measure time on fit_predict and evaluate metrics\n", + " \"\"\"\n", + " start_time=time.time()\n", + " \n", + " logs = {'log': train}\n", + " predict_params = {'k': K, 'users': test.select('user_idx').distinct()}\n", + " \n", + " if isinstance(model, (NeuralTS)):\n", + " logs['log'] = pos_neg_train\n", + "\n", + " if isinstance(model, (NeuralTS)):\n", + " logs['item_features'] = item_features\n", + " logs['user_features'] = user_features \n", + " predict_params.update(logs)\n", + "\n", + " model.fit(**logs)\n", + " fit_time = time.time() - start_time\n", + " \n", + " pred=model.predict(**predict_params)\n", + " pred.show(100)\n", + " pred.cache()\n", + " pred.count()\n", + " predict_time = time.time() - start_time - fit_time\n", + "\n", + " experiment.add_result(name + suffix, pred)\n", + " metric_time = time.time() - start_time - fit_time - predict_time\n", + " experiment.results.loc[name + suffix, 'fit_time'] = fit_time\n", + " experiment.results.loc[name + suffix, 'predict_time'] = predict_time\n", + " experiment.results.loc[name + suffix, 'metric_time'] = metric_time\n", + " experiment.results.loc[name + suffix, 'full_time'] = (fit_time + \n", + " predict_time +\n", + " metric_time)\n", + " pred.unpersist()\n", + " print(experiment.results[['NDCG@{}'.format(K), 'MRR@{}'.format(K), 'Coverage@{}'.format(K), 'fit_time']].sort_values('NDCG@{}'.format(K), ascending=False))\n", + " #add for me\n", + " return pred" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a5e270a6-1077-43ef-86c0-9eed0aadd269", + "metadata": {}, + "outputs": [], + "source": [ + "def full_pipeline(models, experiment, train, test, suffix='', budget=BUDGET):\n", + " \"\"\"\n", + " For each model:\n", + " - if required: run hyperparameters search, set best params and save param values to `experiment`\n", + " - pass model to `fit_predict_add_res` \n", + " \"\"\"\n", + " \n", + " for name, [model, params] in models.items():\n", + " model.logger.info(msg='{} started'.format(name))\n", + " if params != 'no_opt':\n", + " model.logger.info(msg='{} optimization started'.format(name))\n", + " best_params = model.optimize(opt_train, \n", + " opt_val, \n", + " param_borders=params, \n", + " item_features=item_features,\n", + " user_features=user_features,\n", + " k=K, \n", + " budget=budget)\n", + " logger.info(msg='best params for {} are: {}'.format(name, best_params))\n", + " model.set_params(**best_params)\n", + " \n", + " logger.info(msg='{} fit_predict started'.format(name))\n", + " pred = fit_predict_add_res(name, model, experiment, train, test, suffix)\n", + " # here we call protected attribute to get all parameters set during model initialization\n", + " experiment.results.loc[name + suffix, 'params'] = str(model._init_args)\n", + " #add for me\n", + " return pred" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca54985e-3e70-451a-bd03-25cd8c2c2326", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "pred = full_pipeline(bandit_models, e, train, test)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "425f2216-b01f-4c5a-bc63-dc5521bcefd5", + "metadata": {}, + "outputs": [], + "source": [ + "e.results.sort_values('NDCG@10', ascending=False)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/12_neural_ts_exp.ipynb b/examples/12_neural_ts_exp.ipynb new file mode 100644 index 000000000..b353ed32a --- /dev/null +++ b/examples/12_neural_ts_exp.ipynb @@ -0,0 +1,919 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b9ec062e-7d66-4c53-a480-bc6639d38e55", + "metadata": {}, + "source": [ + "We will show the main RePlay functionality and compare performance of RePlay models on well-known MovieLens dataset. For simplicity we consider here only the variou bandit algorithms with context. The list of considered strategies for comparison:\n", + "\n", + "1) Linear UCB\n", + "\n", + "2) Linear Thompson Sampling\n", + "\n", + "3) Logistic Thompson Sampling\n", + "\n", + "### Dataset\n", + "\n", + "We will compare RePlay models on MovieLens 1m.\n", + "\n", + "### Dataset preprocessing:\n", + "\n", + "Ratings greater than or equal to 3 are considered as positive interactions.\n", + "\n", + "### Data split\n", + "\n", + "Dataset is split by date so that 20% of the last interactions as are placed in the test part. Cold items and users are dropped.\n", + "\n", + "### Predict:\n", + "We will predict top-10 most relevant films for each user.\n", + "\n", + "### Metrics\n", + "\n", + "Quality metrics used: ndcg@k, hitrate@k, map@k, mrr@k for k = 1, 5, 10 Additional metrics used: coverage@k and surprisal@k.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "518b0d1a-a574-413b-928f-5755c763c405", + "metadata": {}, + "outputs": [], + "source": [ + "! pip install rs-datasets" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7dd87a23-af97-44ca-b3f4-db88eb6bfb35", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "42bf7dd6-e79f-4d73-a6be-f8df5d75d6d2", + "metadata": {}, + "outputs": [], + "source": [ + "%config Completer.use_jedi = False" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1d9abd75-4180-4a67-9b07-eddac41ebbc1", + "metadata": {}, + "outputs": [], + "source": [ + "import warnings\n", + "from optuna.exceptions import ExperimentalWarning\n", + "warnings.filterwarnings(\"ignore\", category=UserWarning)\n", + "warnings.filterwarnings(\"ignore\", category=ExperimentalWarning)\n", + "warnings.simplefilter(action='ignore', category=FutureWarning)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c9cdf0fa-0e15-4a5f-966d-d65e3eac7f2e", + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import time\n", + "import pandas as pd\n", + "import numpy as np\n", + "\n", + "from pyspark.sql import functions as sf, types as st\n", + "from pyspark.sql.types import IntegerType\n", + "\n", + "from replay.experimental.preprocessing.data_preparator import Indexer, DataPreparator\n", + "from replay.metrics import Coverage, HitRate, MRR, MAP, NDCG, Surprisal, Experiment\n", + "from replay.experimental.models import (\n", + " NeuralTS\n", + ")\n", + "\n", + "\n", + "from replay.utils.session_handler import State\n", + "from replay.splitters import TimeSplitter\n", + "from replay.utils.spark_utils import get_log_info\n", + "from rs_datasets import MovieLens" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9490b442-0f13-4106-ab81-c0c8c0e14ffb", + "metadata": {}, + "outputs": [], + "source": [ + "spark = State().session\n", + "spark" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "176cba50-02f2-4a0b-90da-31d6a4ca3be0", + "metadata": {}, + "outputs": [], + "source": [ + "spark.sparkContext.setLogLevel('ERROR')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "de6cfeb3-e9a8-40f0-a86a-29c3351c19e4", + "metadata": {}, + "outputs": [], + "source": [ + "logger = logging.getLogger(\"replay\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e2e59462-943b-49e4-ad29-47c68655db10", + "metadata": {}, + "outputs": [], + "source": [ + "K = 10\n", + "K_list_metrics = [1, 5, 10]\n", + "BUDGET = 20\n", + "BUDGET_NN = 10\n", + "SEED = 12345" + ] + }, + { + "cell_type": "markdown", + "id": "a60db72f-6060-4727-9916-ffe170777cb6", + "metadata": {}, + "source": [ + "## Preprocessing " + ] + }, + { + "cell_type": "markdown", + "id": "6c4d61c4-3f36-4422-b4ea-8938fe53bcb3", + "metadata": {}, + "source": [ + "### Data loading" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "afb72229-1f12-412e-bb6d-65e49f2acb5e", + "metadata": {}, + "outputs": [], + "source": [ + "data = MovieLens(\"1m\")\n", + "data.info()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "799925a2-9b6d-485e-b373-a7118a32fbf7", + "metadata": {}, + "outputs": [], + "source": [ + "data.ratings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "367862be-9ccd-4ae9-847b-6cd0ce4b82df", + "metadata": {}, + "outputs": [], + "source": [ + "preparator = DataPreparator()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ba5deeec-7890-4273-b77f-2733c2c1c1b0", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "log = preparator.transform(columns_mapping={'user_id': 'user_id',\n", + " 'item_id': 'item_id',\n", + " 'relevance': 'rating',\n", + " 'timestamp': 'timestamp'\n", + " }, \n", + " data=data.ratings)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1f43241-03d3-44b6-9208-c9e40523d91f", + "metadata": {}, + "outputs": [], + "source": [ + "# will consider ratings >= 3 as positive feedback. A positive feedback is treated with relevance = 1\n", + "only_positives_log = log.filter(sf.col('relevance') >= 3).withColumn('relevance', sf.lit(1))\n", + "only_positives_log.count()" + ] + }, + { + "cell_type": "markdown", + "id": "3007ddfc-1b7b-45d6-b550-dc628b24d176", + "metadata": {}, + "source": [ + "### Indexing " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "102618c4-c2d1-4083-aef4-3329abeb798a", + "metadata": {}, + "outputs": [], + "source": [ + "indexer = Indexer(user_col='user_id', item_col='item_id')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "35619648-b44b-4b53-8809-0cc20e1ee7f9", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "indexer.fit(users=log.select('user_id'),\n", + " items=log.select('item_id'))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a040889-0bff-476d-af92-3af77b323cc0", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "log_replay = indexer.transform(df=only_positives_log)\n", + "log_replay.show(2)\n", + "log_replay.count()" + ] + }, + { + "cell_type": "markdown", + "id": "9a8d487f-8a46-4470-9006-01c24ffc7b87", + "metadata": {}, + "source": [ + "### Data split" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "90514332-4396-4b0f-a497-e28b679a9e31", + "metadata": {}, + "outputs": [], + "source": [ + "# train/test split \n", + "train_spl = TimeSplitter(\n", + " time_threshold=0.2,\n", + " drop_cold_items=True,\n", + " drop_cold_users=True,\n", + " query_column=\"user_idx\",\n", + " item_column=\"item_idx\",\n", + ")\n", + "\n", + "train, test = train_spl.split(log_replay)\n", + "print('train info:\\n', get_log_info(train))\n", + "print('test info:\\n', get_log_info(test))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bbef1ae0-df6f-42e8-8679-63ef36458dd6", + "metadata": {}, + "outputs": [], + "source": [ + "train.is_cached" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "843be418-7120-41d8-85a4-c4587872224f", + "metadata": {}, + "outputs": [], + "source": [ + "# train/test split for hyperparameters selection\n", + "opt_train, opt_val = train_spl.split(train)\n", + "opt_train.count(), opt_val.count()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "86661982-34f3-4c5a-bf4c-a543837f4d40", + "metadata": {}, + "outputs": [], + "source": [ + "opt_train.is_cached" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4502490b-59cb-4896-8b0f-e8985c6ad32c", + "metadata": {}, + "outputs": [], + "source": [ + "# negative feedback will be used for Wilson and UCB models\n", + "only_negatives_log = indexer.transform(df=log.filter(sf.col('relevance') < 3).withColumn('relevance', sf.lit(0.)))\n", + "test_start = test.agg(sf.min('timestamp')).collect()[0][0]\n", + "\n", + "# train with both positive and negative feedback\n", + "pos_neg_train=(train\n", + " .withColumn('relevance', sf.lit(1.))\n", + " .union(only_negatives_log.filter(sf.col('timestamp') < test_start))\n", + " )\n", + "pos_neg_train.cache()\n", + "pos_neg_train.count()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "914f8905-b6e3-411e-b468-2932e8af97db", + "metadata": {}, + "outputs": [], + "source": [ + "pos_neg_train.is_cached" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c8ec5c4d-4302-49e6-8a97-0607a677cf13", + "metadata": {}, + "outputs": [], + "source": [ + "pos_neg_train.groupBy('relevance').count().show()" + ] + }, + { + "cell_type": "markdown", + "id": "e0d0faa9-35bf-4f39-baac-ed6ce1975e59", + "metadata": {}, + "source": [ + "### Item features " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8a2bb8e3-838c-462d-8837-605beab5576d", + "metadata": {}, + "outputs": [], + "source": [ + "item_features_original = preparator.transform(columns_mapping={'item_id': 'item_id'}, \n", + " data=data.items)\n", + "item_features = indexer.transform(df=item_features_original)\n", + "item_features.show(2)\n", + "#different item features\n", + "\n", + "from pyspark.sql.functions import max,min\n", + "item_features.select(max(item_features.item_idx)).show()\n", + "item_features.select(min(item_features.item_idx)).show()\n", + "#just to check that the indexing is dense between 0 and 3882\n", + "item_features.count()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ef866ef5-5a58-435f-bd1a-98b1a84f791c", + "metadata": {}, + "outputs": [], + "source": [ + "year = item_features.withColumn('year', sf.substring(sf.col('title'), -5, 4).astype(st.IntegerType())).select('item_idx', 'year')\n", + "year.show(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "584986c0-ca89-4340-9cb2-c925d2e8b69d", + "metadata": {}, + "outputs": [], + "source": [ + "genres = (\n", + " item_features.select(\n", + " \"item_idx\",\n", + " sf.split(\"genres\", \"\\|\").alias(\"genres\")\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "90f241fb-18e9-4824-b681-944bfb9d3173", + "metadata": {}, + "outputs": [], + "source": [ + "genres_list = (\n", + " genres.select(sf.explode(\"genres\").alias(\"genre\"))\n", + " .distinct().filter('genre <> \"(no genres listed)\"')\n", + " .toPandas()[\"genre\"].tolist()\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cc361d69-2b2f-4e27-b477-a935b9695b31", + "metadata": {}, + "outputs": [], + "source": [ + "item_features = genres\n", + "for genre in genres_list:\n", + " item_features = item_features.withColumn(\n", + " genre,\n", + " sf.array_contains(sf.col(\"genres\"), genre).astype(IntegerType())\n", + " )\n", + "item_features = item_features.drop(\"genres\").cache()\n", + "item_features.count()\n", + "item_features = item_features.join(year, on='item_idx', how='inner')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "67b0e545-a8d1-4a53-8e08-122bd2dd2623", + "metadata": {}, + "outputs": [], + "source": [ + "item_features = item_features.withColumnRenamed(\"Children's\",\"Children\")\n", + "item_features = item_features.toPandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "659878de-012b-4ea0-89ce-84514dcf5bac", + "metadata": {}, + "outputs": [], + "source": [ + "item_features.columns" + ] + }, + { + "cell_type": "markdown", + "id": "a4eddc96-9055-4ac1-8c54-47b19239973a", + "metadata": {}, + "source": [ + "### Users features" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b7a2fddb-3f5d-49c1-8e15-e7d158678739", + "metadata": {}, + "outputs": [], + "source": [ + "data.users.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df210a6e-78ef-42d2-81ca-72edee25a4e3", + "metadata": {}, + "outputs": [], + "source": [ + "#same preprocessing for users as was done in 2.4.1.\n", + "user_features_original = preparator.transform(columns_mapping={'user_id': 'user_id'}, \n", + " data=data.users)\n", + "user_features = indexer.transform(df=user_features_original)\n", + "#switch for a while into pandas\n", + "user_features = user_features.toPandas()\n", + "user_features.head(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e90b8cd9-9f70-4bf4-8466-f833c62c28c6", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.functions import mean, stddev\n", + "user_train_mean = pos_neg_train.groupBy('user_idx').mean('relevance').withColumnRenamed('avg(relevance)','user_mean_rel').toPandas()\n", + "user_train_count = pos_neg_train.groupBy('user_idx').count().withColumnRenamed('count','user_count_rel').toPandas()\n", + "user_train_std = pos_neg_train.groupBy('user_idx').agg({'relevance': 'stddev'}).withColumnRenamed('stddev(relevance)','user_std_rel').toPandas()\n", + "user_train_median = pd.DataFrame(pos_neg_train.toPandas().groupby('user_idx', as_index = False)['relevance'].agg('median')).rename(columns={'relevance': 'user_median_rel'})\n", + "user_train_kurt = pd.DataFrame(pos_neg_train.toPandas().groupby('user_idx', as_index = False)['relevance'].apply(pd.DataFrame.kurt)).rename(columns={'relevance': 'user_kurt_rel'})\n", + "user_train_skew = pd.DataFrame(pos_neg_train.toPandas().groupby('user_idx', as_index = False)['relevance'].apply(pd.DataFrame.skew)).rename(columns={'relevance': 'user_skew_rel'})\n", + "\n", + "item_train_mean = pos_neg_train.groupBy('item_idx').mean('relevance').withColumnRenamed('avg(relevance)','item_mean_rel').toPandas()\n", + "item_train_count = pos_neg_train.groupBy('item_idx').count().withColumnRenamed('count','item_count_rel').toPandas()\n", + "item_train_std = pos_neg_train.groupBy('item_idx').agg({'relevance': 'stddev'}).withColumnRenamed('stddev(relevance)','item_std_rel').toPandas()\n", + "item_train_median = pd.DataFrame(pos_neg_train.toPandas().groupby('item_idx', as_index = False)['relevance'].agg('median')).rename(columns={'relevance': 'item_median_rel'})\n", + "item_train_kurt = pd.DataFrame(pos_neg_train.toPandas().groupby('item_idx', as_index = False)['relevance'].apply(pd.DataFrame.kurt)).rename(columns={'relevance': 'item_kurt_rel'})\n", + "item_train_skew = pd.DataFrame(pos_neg_train.toPandas().groupby('item_idx', as_index = False)['relevance'].apply(pd.DataFrame.skew)).rename(columns={'relevance': 'item_skew_rel'})\n", + "\n", + "full_mean = pos_neg_train.select(mean('relevance')).toPandas().values.squeeze()\n", + "full_count = 0\n", + "full_std = pos_neg_train.agg({'relevance': 'stddev'}).toPandas().values.squeeze()\n", + "full_median = pos_neg_train.toPandas()['relevance'].median().squeeze()\n", + "full_kurt = pos_neg_train.toPandas()['relevance'].kurt().squeeze()\n", + "full_skew = pos_neg_train.toPandas()['relevance'].skew().squeeze()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "422cedfe-76d7-4064-a713-6514d4b5deaf", + "metadata": {}, + "outputs": [], + "source": [ + "user_train_sum = pos_neg_train.groupBy('user_idx').sum('relevance').withColumnRenamed('sum(relevance)','user_sum_rel').toPandas()\n", + "item_train_sum = pos_neg_train.groupBy('item_idx').sum('relevance').withColumnRenamed('sum(relevance)','item_sum_rel').toPandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aa549b76-ab84-4eb0-8e90-da49717868e5", + "metadata": {}, + "outputs": [], + "source": [ + "item_train_sum.head(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8b7f3532-4404-4b20-ab55-3a9bb54443be", + "metadata": {}, + "outputs": [], + "source": [ + "user_all_true = pd.DataFrame({'user_idx': user_train_sum['user_idx'].values , \n", + " 'user_all_true': (user_train_sum['user_sum_rel'] == user_train_count['user_count_rel'].values).astype(int)})\n", + "item_all_true = pd.DataFrame({'item_idx': item_train_sum['item_idx'].values , \n", + " 'item_all_true': (item_train_sum['item_sum_rel'] == item_train_count['item_count_rel'].values).astype(int)})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fae08c53-1ce3-4675-b2ab-86e91b437512", + "metadata": {}, + "outputs": [], + "source": [ + "pop_item = pd.DataFrame({'item_idx': item_train_mean['item_idx'].values , \n", + " 'item_is_pop': (item_train_mean['item_mean_rel'] > 0.9).astype(int)})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a8cf10e7-6897-4ce0-9c9b-1426c4e746e1", + "metadata": {}, + "outputs": [], + "source": [ + "from sklearn.preprocessing import OneHotEncoder\n", + "print(\"max ocupation index: \", user_features['occupation'].max())\n", + "print(\"min ocupation index: \", user_features['occupation'].min())\n", + "count_diff_zips = user_features['zip_code'].unique().size\n", + "print(\"different zip codes: \", count_diff_zips) \n", + "users_pd = user_features\n", + "users_pd['zip_code'] = users_pd['zip_code'].apply(lambda x: x[0])\n", + "\n", + "#binarize age variable\n", + "bins = [0, 20, 30, 40, 50, 60, np.inf]\n", + "names = ['<20', '20-29', '30-39','40-49', '51-60', '60+']\n", + "\n", + "users_pd['agegroup'] = pd.cut(users_pd['age'], bins, labels=names)\n", + "# users_pd = users_pd.drop([\"age\"], axis = 1)\n", + "# users_pd.head()\n", + "\n", + "# #binarize following https://github.com/kfoofw/bandit_simulations/tree/master\n", + "# columnsToEncode = [\"agegroup\",\"gender\",\"occupation\"]\n", + "# myEncoder = OneHotEncoder(sparse=False, handle_unknown='ignore')\n", + "# myEncoder.fit(users_pd[columnsToEncode])\n", + "\n", + "# users_pd = pd.concat([users_pd.drop(columnsToEncode, 1),\n", + "# pd.DataFrame(myEncoder.transform(users_pd[columnsToEncode]), \n", + "# columns = myEncoder.get_feature_names_out(columnsToEncode))], axis=1).reindex()\n", + "users_pd['gender_occupation'] = users_pd['gender'] + '_' + users_pd['occupation'].astype(str)\n", + "users_pd.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b669a3c4-6be1-438f-8db4-8b85003e79f5", + "metadata": {}, + "outputs": [], + "source": [ + "#stats\n", + "users_pd = users_pd.merge(user_train_mean, on='user_idx', how='left')\n", + "users_pd['user_mean_rel'].fillna(full_mean, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_count, on='user_idx', how='left')\n", + "users_pd['user_count_rel'].fillna(full_count, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_std, on='user_idx', how='left')\n", + "users_pd['user_std_rel'].fillna(full_std, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_median, on='user_idx', how='left')\n", + "users_pd['user_median_rel'].fillna(full_median, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_kurt, on='user_idx', how='left')\n", + "users_pd['user_kurt_rel'].fillna(full_kurt, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_train_skew, on='user_idx', how='left')\n", + "users_pd['user_skew_rel'].fillna(full_skew, inplace = True)\n", + "\n", + "users_pd = users_pd.merge(user_all_true, on='user_idx', how='left')\n", + "users_pd['user_all_true'].fillna(0, inplace = True)\n", + "\n", + "\n", + "item_features = item_features.merge(item_train_mean, on='item_idx', how='left')\n", + "item_features['item_mean_rel'].fillna(full_mean, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_count, on='item_idx', how='left')\n", + "item_features['item_count_rel'].fillna(full_count, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_std, on='item_idx', how='left')\n", + "item_features['item_std_rel'].fillna(full_std, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_median, on='item_idx', how='left')\n", + "item_features['item_median_rel'].fillna(full_median, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_kurt, on='item_idx', how='left')\n", + "item_features['item_kurt_rel'].fillna(full_kurt, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_train_skew, on='item_idx', how='left')\n", + "item_features['item_skew_rel'].fillna(full_skew, inplace = True)\n", + "\n", + "item_features = item_features.merge(item_all_true, on='item_idx', how='left')\n", + "item_features['item_all_true'].fillna(0, inplace = True)\n", + "\n", + "item_features = item_features.merge(pop_item, on='item_idx', how='left')\n", + "item_features['item_is_pop'].fillna(0, inplace = True)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7948ea64-1786-42f0-bc01-f0172d41fd63", + "metadata": {}, + "outputs": [], + "source": [ + "#make it pyspark\n", + "user_features = spark.createDataFrame(users_pd) \n", + "user_features.cache()\n", + "user_features.printSchema()\n", + "print(\"total users: \",user_features.count())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "005148e2-4bd9-4e4e-9bec-5622ca34be3a", + "metadata": {}, + "outputs": [], + "source": [ + "item_features = spark.createDataFrame(item_features) \n", + "item_features.cache()\n", + "item_features.printSchema()\n", + "print(\"total users: \",item_features.count())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "be6cf4d6-4ec2-40eb-a86d-4c82c7350069", + "metadata": {}, + "outputs": [], + "source": [ + "# item_features = indexer.transform(df=item_features)\n", + "# user_features = indexer.transform(df=user_features)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3e45a708-0d64-4e1f-85e2-d2b91404cb9f", + "metadata": {}, + "outputs": [], + "source": [ + "cat_embed_cols = [\"agegroup\", 'gender', 'occupation', \"zip_code\"]\n", + "continuous_cols = [\"Crime\", \"Sci-Fi\", \"Musical\", \n", + " \"Mystery\", \"Documentary\", \"Fantasy\", \"Children\", \n", + " \"Drama\", \"Horror\", \"Adventure\", \"Western\",\n", + " \"Romance\", \"War\", \"Animation\", \"Action\", \"Comedy\", \"Thriller\",\n", + " \"Film-Noir\"]\n", + " # , \"age\",\n", + " # 'user_mean_rel', 'user_count_rel', 'user_std_rel',\n", + " # 'user_median_rel', 'user_kurt_rel', 'user_skew_rel',\n", + " # 'item_mean_rel', 'item_count_rel', 'item_std_rel',\n", + " # 'item_median_rel', 'item_kurt_rel', 'item_skew_rel']\n", + "wide_cols = ['gender', 'occupation']\n", + "crossed_cols = ['gender_occupation']\n", + "\n", + "cols_item = {'continuous_cols':[], 'cat_embed_cols':[], 'wide_cols': []}\n", + "\n", + "cols_user = {'continuous_cols':[], 'cat_embed_cols':[], 'wide_cols': []}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8fc70e61-6e57-475b-a584-28f2143c978a", + "metadata": {}, + "outputs": [], + "source": [ + "e = Experiment(\n", + " [ MAP(K), \n", + " NDCG(K), \n", + " HitRate(K_list_metrics), \n", + " Coverage(K),\n", + " Surprisal(K),\n", + " MRR(K)],\n", + " test,\n", + " pos_neg_train,\n", + " query_column=\"user_idx\", item_column=\"item_idx\", rating_column=\"relevance\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1239639-00cd-4ab1-acdc-3c6d7b00870a", + "metadata": {}, + "outputs": [], + "source": [ + " bandit_models = {\n", + " 'Neural TS' : [NeuralTS(user_cols = cols_user,\n", + " item_cols = cols_item,\n", + " dim_head=20,\n", + " deep_out_dim=20,\n", + " hidden_layers=[32, 20],\n", + " embedding_sizes=[32, 32, 64],\n", + " wide_out_dim=1,\n", + " head_dropout=0.8,\n", + " deep_dropout=0.4,\n", + " n_epochs=2,\n", + " opt_lr = 3e-4,\n", + " lr_min = 1e-5,\n", + " use_gpu = False,\n", + " plot_dir='test.png',\n", + " use_warp_loss=True,\n", + " cnt_neg_samples=200,\n", + " cnt_samples_for_predict=10,\n", + " exploration_coef = +1.0\n", + " ), 'no_opt'] \n", + " }" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df0171a4-8c59-4c30-8034-23644dd0fe06", + "metadata": {}, + "outputs": [], + "source": [ + "def fit_predict_add_res(name, model, experiment, train, test, suffix=''):\n", + " \"\"\"\n", + " Run fit_predict for the `model`, measure time on fit_predict and evaluate metrics\n", + " \"\"\"\n", + " start_time=time.time()\n", + " \n", + " logs = {'log': train}\n", + " predict_params = {'k': K, 'users': test.select('user_idx').distinct()}\n", + " \n", + " if isinstance(model, (NeuralTS)):\n", + " logs['log'] = pos_neg_train\n", + "\n", + " if isinstance(model, (NeuralTS)):\n", + " logs['item_features'] = item_features\n", + " logs['user_features'] = user_features \n", + " predict_params.update(logs)\n", + "\n", + " model.fit(**logs)\n", + " fit_time = time.time() - start_time\n", + " \n", + " pred=model.predict(**predict_params)\n", + " pred.show(100)\n", + " pred.cache()\n", + " pred.count()\n", + " predict_time = time.time() - start_time - fit_time\n", + "\n", + " experiment.add_result(name + suffix, pred)\n", + " metric_time = time.time() - start_time - fit_time - predict_time\n", + " experiment.results.loc[name + suffix, 'fit_time'] = fit_time\n", + " experiment.results.loc[name + suffix, 'predict_time'] = predict_time\n", + " experiment.results.loc[name + suffix, 'metric_time'] = metric_time\n", + " experiment.results.loc[name + suffix, 'full_time'] = (fit_time + \n", + " predict_time +\n", + " metric_time)\n", + " pred.unpersist()\n", + " print(experiment.results[['NDCG@{}'.format(K), 'MRR@{}'.format(K), 'Coverage@{}'.format(K), 'fit_time']].sort_values('NDCG@{}'.format(K), ascending=False))\n", + " #add for me\n", + " return pred" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a5e270a6-1077-43ef-86c0-9eed0aadd269", + "metadata": {}, + "outputs": [], + "source": [ + "def full_pipeline(models, experiment, train, test, suffix='', budget=BUDGET):\n", + " \"\"\"\n", + " For each model:\n", + " - if required: run hyperparameters search, set best params and save param values to `experiment`\n", + " - pass model to `fit_predict_add_res` \n", + " \"\"\"\n", + " \n", + " for name, [model, params] in models.items():\n", + " model.logger.info(msg='{} started'.format(name))\n", + " if params != 'no_opt':\n", + " model.logger.info(msg='{} optimization started'.format(name))\n", + " best_params = model.optimize(opt_train, \n", + " opt_val, \n", + " param_borders=params, \n", + " item_features=item_features,\n", + " user_features=user_features,\n", + " k=K, \n", + " budget=budget)\n", + " logger.info(msg='best params for {} are: {}'.format(name, best_params))\n", + " model.set_params(**best_params)\n", + " \n", + " logger.info(msg='{} fit_predict started'.format(name))\n", + " pred = fit_predict_add_res(name, model, experiment, train, test, suffix)\n", + " # here we call protected attribute to get all parameters set during model initialization\n", + " experiment.results.loc[name + suffix, 'params'] = str(model._init_args)\n", + " #add for me\n", + " return pred" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca54985e-3e70-451a-bd03-25cd8c2c2326", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "pred = full_pipeline(bandit_models, e, train, test)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "425f2216-b01f-4c5a-bc63-dc5521bcefd5", + "metadata": {}, + "outputs": [], + "source": [ + "e.results.sort_values('NDCG@10', ascending=False)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/replay/experimental/models/__init__.py b/replay/experimental/models/__init__.py index cebd3adc5..d95518d82 100644 --- a/replay/experimental/models/__init__.py +++ b/replay/experimental/models/__init__.py @@ -6,5 +6,6 @@ from replay.experimental.models.implicit_wrap import ImplicitWrap from replay.experimental.models.lightfm_wrap import LightFMWrap from replay.experimental.models.mult_vae import MultVAE +from replay.experimental.models.neural_ts import NeuralTS from replay.experimental.models.neuromf import NeuroMF from replay.experimental.models.scala_als import ScalaALSWrap diff --git a/replay/experimental/models/base_rec.py b/replay/experimental/models/base_rec.py index 39e7d92bf..47b546375 100644 --- a/replay/experimental/models/base_rec.py +++ b/replay/experimental/models/base_rec.py @@ -307,6 +307,75 @@ def _predict( ``[user_idx, item_idx, relevance]`` """ + def _predict_proba( + self, + log: SparkDataFrame, + k: int, + users: SparkDataFrame, + items: SparkDataFrame, + user_features: Optional[SparkDataFrame] = None, + item_features: Optional[SparkDataFrame] = None, + filter_seen_items: bool = True, + ) -> np.ndarray: + """ + Inner method where model actually predicts probability estimates. + + Mainly used in ```OBPOfflinePolicyLearner```. + + :param log: historical log of interactions + ``[user_idx, item_idx, timestamp, rating]`` + :param k: number of recommendations for each user + :param users: users to create recommendations for + dataframe containing ``[user_idx]`` or ``array-like``; + if ``None``, recommend to all users from ``log`` + :param items: candidate items for recommendations + dataframe containing ``[item_idx]`` or ``array-like``; + if ``None``, take all items from ``log``. + If it contains new items, ``rating`` for them will be ``0``. + :param user_features: user features + ``[user_idx , timestamp]`` + feature columns + :param item_features: item features + ``[item_idx , timestamp]`` + feature columns + :param filter_seen_items: flag to remove seen items from recommendations based on ``log``. + :return: distribution over items for each user with shape + ``(n_users, n_items, k)`` + where we have probability for each user to choose item at fixed position(top-k). + """ + + n_users = users.select("user_idx").count() + n_items = items.select("item_idx").count() + + recs = self._predict( + log, + k, + users, + items, + user_features, + item_features, + filter_seen_items, + ) + + recs = get_top_k_recs(recs, k=k).select("user_idx", "item_idx", "relevance") + + cols = [f"k{i}" for i in range(k)] + + recs_items = ( + recs.groupBy("user_idx") + .agg(sf.collect_list("item_idx").alias("item_idx")) + .select([sf.col("item_idx")[i].alias(cols[i]) for i in range(k)]) + ) + + action_dist = np.zeros(shape=(n_users, n_items, k)) + + for i in range(k): + action_dist[ + np.arange(n_users), + recs_items.select(cols[i]).toPandas()[cols[i]].to_numpy(), + np.ones(n_users, dtype=int) * i, + ] += 1 + + return action_dist + def _get_fit_counts(self, entity: str) -> int: if not hasattr(self, f"_num_{entity}s"): setattr( diff --git a/replay/experimental/models/neural_ts.py b/replay/experimental/models/neural_ts.py new file mode 100644 index 000000000..9cb14442e --- /dev/null +++ b/replay/experimental/models/neural_ts.py @@ -0,0 +1,983 @@ +import os +from typing import Dict, List, Optional, Tuple, Union + +import joblib +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import torch +from IPython.display import clear_output +from pyspark.sql import DataFrame +from sklearn.preprocessing import MinMaxScaler, OneHotEncoder +from torch import Tensor, nn +from torch.utils.data import DataLoader, Dataset, SequentialSampler +from tqdm import tqdm + +from replay.experimental.models.base_rec import HybridRecommender +from replay.splitters import TimeSplitter +from replay.utils.spark_utils import convert2spark + +pd.options.mode.chained_assignment = None + + +def cartesian_product(left, right): + """ + This function computes cartesian product. + """ + return left.assign(key=1).merge(right.assign(key=1), on="key").drop(columns=["key"]) + + +def num_tries_gt_zero(scores, batch_size, max_trials, max_num, device): + """ + scores: [batch_size x N] float scores + returns: [batch_size x 1] the lowest indice per row where scores were first greater than 0. plus 1 + """ + tmp = scores.gt(0).nonzero().t() + # We offset these values by 1 to look for unset values (zeros) later + values = tmp[1] + 1 + # Sparse tensors can't be moved with .to() or .cuda() if you want to send in cuda variables first + if device.type == "cuda": + tau = torch.cuda.sparse.LongTensor(tmp, values, torch.Size((batch_size, max_trials + 1))).to_dense() + else: + tau = torch.sparse.LongTensor(tmp, values, torch.Size((batch_size, max_trials + 1))).to_dense() + tau[(tau == 0)] += max_num # set all unused indices to be max possible number so its not picked by min() call + + tries = torch.min(tau, dim=1)[0] + return tries + + +def w_log_loss(output, target, device): + """ + This function computes weighted logistic loss. + """ + output = torch.nn.functional.sigmoid(output) + output = torch.clamp(output, min=1e-7, max=1 - 1e-7) + count_1 = target.sum().item() + count_0 = target.shape[0] - count_1 + class_count = np.array([count_0, count_1]) + if count_1 == 0 or count_0 == 0: # noqa: SIM108 + weight = np.array([1.0, 1.0]) + else: + weight = np.max(class_count) / class_count + weight = Tensor(weight).to(device) + loss = weight[1] * target * torch.log(output) + weight[0] * (1 - target) * torch.log(1 - output) + return -loss.mean() + + +def warp_loss(positive_predictions, negative_predictions, num_labels, device): + """ + positive_predictions: [batch_size x 1] floats between -1 to 1 + negative_predictions: [batch_size x N] floats between -1 to 1 + num_labels: int total number of labels in dataset (not just the subset you're using for the batch) + device: pytorch.device + """ + batch_size, max_trials = negative_predictions.size(0), negative_predictions.size(1) + + offsets, ones, max_num = ( + torch.arange(0, batch_size, 1).long().to(device) * (max_trials + 1), + torch.ones(batch_size, 1).float().to(device), + batch_size * (max_trials + 1), + ) + + sample_scores = 1 + negative_predictions - positive_predictions + # Add column of ones so we know when we used all our attempts. + # This is used for indexing and computing should_count_loss if no real value is above 0 + sample_scores, negative_predictions = ( + torch.cat([sample_scores, ones], dim=1), + torch.cat([negative_predictions, ones], dim=1), + ) + + tries = num_tries_gt_zero(sample_scores, batch_size, max_trials, max_num, device) + attempts, trial_offset = tries.float(), (tries - 1) + offsets + # Don't count loss if we used max number of attempts + loss_weights = torch.log(torch.floor((num_labels - 1) / attempts)) + should_count_loss = (attempts <= max_trials).float() + losses = ( + loss_weights + * ((1 - positive_predictions.view(-1)) + negative_predictions.view(-1)[trial_offset]) + * should_count_loss + ) + + return losses.sum() + + +class SamplerWithReset(SequentialSampler): + """ + Sampler class for train dataloader. + """ + + def __iter__(self): + self.data_source.reset() + return super().__iter__() + + +class UserDatasetWithReset(Dataset): + """ + Dataset class that takes data for a single user and + column names for continuous data, categorical data and data for + Wide model as well as the name of the target column. + The class also supports sampling of negative examples. + """ + + def __init__( + self, + idx, + log_train, + user_features, + item_features, + list_items, + union_cols, + cnt_neg_samples, + device, + target: Optional[str] = None, + ): + if cnt_neg_samples is not None: + self.cnt_neg_samples = cnt_neg_samples + self.user_features = user_features + self.item_features = item_features + item_idx_user = log_train["item_idx"].values.tolist() + self.item_idx_not_user = list(set(list_items).difference(set(item_idx_user))) + else: + self.cnt_neg_samples = cnt_neg_samples + self.user_features = None + self.item_features = None + self.item_idx_not_user = None + self.device = device + self.union_cols = union_cols + dataframe = log_train.merge(user_features, on="user_idx", how="inner") + self.dataframe = dataframe.merge(item_features, on="item_idx", how="inner") + self.user_idx = idx + self.data_sample = None + self.wide_part = Tensor(self.dataframe[self.union_cols["wide_cols"]].to_numpy().astype("float32")).to( + self.device + ) + self.continuous_part = Tensor( + self.dataframe[self.union_cols["continuous_cols"]].to_numpy().astype("float32") + ).to(self.device) + self.cat_part = Tensor(self.dataframe[self.union_cols["cat_embed_cols"]].to_numpy().astype("float32")).to( + self.device + ) + self.users = Tensor(self.dataframe[["user_idx"]].to_numpy().astype("int")).to(torch.long).to(self.device) + self.items = Tensor(self.dataframe[["item_idx"]].to_numpy().astype("int")).to(torch.long).to(self.device) + if target is not None: + self.target = Tensor(dataframe[target].to_numpy().astype("int")).to(self.device) + else: + self.target = target + self.target_column = target + + def get_parts(self, data_sample): + """ + Dataset method that selects user index, item indexes, categorical data, + continuous data, data for wide model, and target value. + """ + self.wide_part = Tensor(data_sample[self.union_cols["wide_cols"]].to_numpy().astype("float32")).to(self.device) + self.continuous_part = Tensor(data_sample[self.union_cols["continuous_cols"]].to_numpy().astype("float32")).to( + self.device + ) + self.cat_part = Tensor(data_sample[self.union_cols["cat_embed_cols"]].to_numpy().astype("float32")).to( + self.device + ) + self.users = Tensor(data_sample[["user_idx"]].to_numpy().astype("int")).to(torch.long).to(self.device) + self.items = Tensor(data_sample[["item_idx"]].to_numpy().astype("int")).to(torch.long).to(self.device) + if self.target_column is not None: + self.target = Tensor(data_sample[self.target_column].to_numpy().astype("int")).to(self.device) + else: + self.target = self.target_column + + def __getitem__(self, idx): + target = -1 + if self.target is not None: + target = self.target[idx] + return ( + self.wide_part[idx], + self.continuous_part[idx], + self.cat_part[idx], + self.users[idx], + self.items[idx], + target, + ) + + def __len__(self): + if self.data_sample is not None: + return self.data_sample.shape[0] + else: + return self.dataframe.shape[0] + + def get_size_features(self): + """ + Dataset method that gets the size of features after encoding/scaling. + """ + return self.wide_part.shape[1], self.continuous_part.shape[1], self.cat_part.shape[1] + + def reset(self): + """ + Dataset methos that samples new negative examples.. + """ + n_samples = min(len(self.item_idx_not_user), self.cnt_neg_samples) + if n_samples > 0: + sample_item = np.random.choice(self.item_idx_not_user, n_samples, replace=False) + sample_item_feat = self.item_features.loc[self.item_features["item_idx"].isin(sample_item)] + sample_item_feat = sample_item_feat.set_axis(range(sample_item_feat.shape[0]), axis="index") + df_sample = cartesian_product( + self.user_features.loc[self.user_features["user_idx"] == self.user_idx], sample_item_feat + ) + df_sample[self.target_column] = 0 + self.data_sample = pd.concat([self.dataframe, df_sample], axis=0, ignore_index=True) + self.get_parts(self.data_sample) + + +class Wide(nn.Module): + """ + Wide model based on https://arxiv.org/abs/1606.07792 + """ + + def __init__(self, input_dim: int, out_dim: int = 1): + super().__init__() + + self.linear = nn.Sequential(nn.Linear(input_dim, out_dim), nn.ReLU(), nn.BatchNorm1d(out_dim)) + self.out_dim = out_dim + + def forward(self, input_data): + """ + :param input_data: wide features + :return: torch tensor with shape batch_size*out_dim + """ + output = self.linear(input_data) + return output + + +class Deep(nn.Module): + """ + Deep model based on https://arxiv.org/abs/1606.07792 + """ + + def __init__(self, input_dim: int, out_dim: int, hidden_layers: List[int], deep_dropout: float): + super().__init__() + model = [] + last_size = input_dim + for cur_size in hidden_layers: + model += [nn.Linear(last_size, cur_size)] + model += [nn.ReLU()] + model += [nn.BatchNorm1d(cur_size)] + model += [nn.Dropout(deep_dropout)] + last_size = cur_size + model += [nn.Linear(last_size, out_dim)] + model += [nn.ReLU()] + model += [nn.BatchNorm1d(out_dim)] + model += [nn.Dropout(deep_dropout)] + self.deep_model = nn.Sequential(*model) + + def forward(self, input_data): + """ + :param input_data: deep features + :return: torch tensor with shape batch_size*out_dim + """ + output = self.deep_model(input_data) + return output + + +class EmbedModel(nn.Module): + """ + Model for getting embeddings for user and item indexes. + """ + + def __init__(self, cnt_users: int, cnt_items: int, user_embed: int, item_embed: int, crossed_embed: int): + super().__init__() + self.user_embed = nn.Embedding(num_embeddings=cnt_users, embedding_dim=user_embed) + self.item_embed = nn.Embedding(num_embeddings=cnt_items, embedding_dim=item_embed) + self.user_crossed_embed = nn.Embedding(num_embeddings=cnt_users, embedding_dim=crossed_embed) + self.item_crossed_embed = nn.Embedding(num_embeddings=cnt_items, embedding_dim=crossed_embed) + + def forward(self, users, items): + """ + :param users: user indexes + :param items: item indexes + :return: torch tensors: embedings for users, embedings for items, + embedings for users for wide model, + embedings for items for wide model, + embedings for pairs (users, items) for wide model + """ + users_to_embed = self.user_embed(users).squeeze() + items_to_embed = self.item_embed(items).squeeze() + cross_users = self.user_crossed_embed(users).squeeze() + cross_items = self.item_crossed_embed(items).squeeze() + cross = (cross_users * cross_items).sum(dim=-1).unsqueeze(-1) + return users_to_embed, items_to_embed, cross_users, cross_items, cross + + +class WideDeep(nn.Module): + """ + Wide&Deep model based on https://arxiv.org/abs/1606.07792 + """ + + def __init__( + self, + dim_head: int, + deep_out_dim: int, + hidden_layers: List[int], + size_wide_features: int, + size_continuous_features: int, + size_cat_features: int, + wide_out_dim: int, + head_dropout: float, + deep_dropout: float, + cnt_users: int, + cnt_items: int, + user_embed: int, + item_embed: int, + crossed_embed: int, + ): + super().__init__() + self.embed_model = EmbedModel(cnt_users, cnt_items, user_embed, item_embed, crossed_embed) + self.wide = Wide(size_wide_features + crossed_embed * 2 + 1, wide_out_dim) + self.deep = Deep( + size_cat_features + size_continuous_features + user_embed + item_embed, + deep_out_dim, + hidden_layers, + deep_dropout, + ) + self.head_model = nn.Sequential(nn.Linear(wide_out_dim + deep_out_dim, dim_head), nn.ReLU()) + self.last_layer = nn.Sequential(nn.Linear(dim_head, 1)) + self.head_dropout = head_dropout + + def forward_for_predict(self, wide_part, continuous_part, cat_part, users, items): + """ + Forward method without last layer and dropout that is used for prediction. + """ + users_to_embed, items_to_embed, cross_users, cross_items, cross = self.embed_model(users, items) + input_deep = torch.cat((cat_part, continuous_part, users_to_embed, items_to_embed), dim=-1).squeeze() + out_deep = self.deep(input_deep) + wide_part = torch.cat((wide_part, cross_users, cross_items, cross), dim=-1) + out_wide = self.wide(wide_part) + input_data = torch.cat((out_wide, out_deep), dim=-1) + output = self.head_model(input_data) + return output + + def forward_dropout(self, input_data): + """ + Forward method for multiple prediction with active dropout + :param input_data: output of forward_for_predict + :return: torch tensor after dropout and last linear layer + """ + output = nn.functional.dropout(input_data, p=self.head_dropout, training=True) + output = self.last_layer(output) + return output + + def forward_for_embeddings( + self, wide_part, continuous_part, cat_part, users_to_embed, items_to_embed, cross_users, cross_items, cross + ): + """ + Forward method after getting emdeddings for users and items. + """ + input_deep = torch.cat((cat_part, continuous_part, users_to_embed, items_to_embed), dim=-1).squeeze() + out_deep = self.deep(input_deep) + wide_part = torch.cat((wide_part, cross_users, cross_items, cross), dim=-1) + out_wide = self.wide(wide_part) + input_data = torch.cat((out_wide, out_deep), dim=-1) + output = self.head_model(input_data) + output = nn.functional.dropout(output, p=self.head_dropout, training=True) + output = self.last_layer(output) + return output + + def forward(self, wide_part, continuous_part, cat_part, users, items): + """ + :param wide_part: features for wide model + :param continuous_part: continuous features + :param cat_part: torch categorical features + :param users: user indexes + :param items: item indexes + :return: relevances for pair (users, items) + + """ + users_to_embed, items_to_embed, cross_users, cross_items, cross = self.embed_model(users, items) + output = self.forward_for_embeddings( + wide_part, continuous_part, cat_part, users_to_embed, items_to_embed, cross_users, cross_items, cross + ) + return output + + +class NeuralTS(HybridRecommender): + """ + 'Neural Thompson sampling recommender + `_ based on `Wide&Deep model + `_. + + :param user_cols: cols_user = {'continuous_cols':List[str], 'cat_embed_cols':List[str], 'wide_cols': List[str]}, + where List[str] -- some column names from user_features dataframe, which is input to the fit method, + or empty List + :param item_cols: cols_item = {'continuous_cols':List[str], 'cat_embed_cols':List[str], 'wide_cols': List[str]}, + where List[str] -- some column names from item_features dataframe, which is input to the fit method, + or empty List + :param dim_head: output size for WideDeep model head + :param deep_out_dim: output size for the Deep model + :param hidden_layers: list of hidden layer sizes for Deep model + :param embedding_sizes: list of length three in which + embedding_sizes[0] = embedding size for users, + embedding_sizes[1] = embedding size for items, + embedding_sizes[2] = embedding size for pair (users, items) + :param wide_out_dim: output size for the Wide model + :param head_dropout: probability of an element to be zeroed for WideDeep model head + :param deep_dropout: probability of an element to be zeroed for Deep model + :param n_epochs: number of epochs for model training + :param opt_lr: learning rate for the AdamW optimizer + :param lr_min: minimum learning rate value for the CosineAnnealingLR learning rate scheduler + :param use_gpu: if true, the model will be trained on the GPU + :param plot_dir: file name where the training graphs will be saved, if None, the graphs will not be saved + :param use_warp_loss: if true, then warp loss will be used otherwise weighted logistic loss. + :param cnt_neg_samples: number of additional negative examples for each user + :param cnt_samples_for_predict: number of sampled predictions for one user, + which are used to estimate the mean and variance of relevance + :param exploration_coef: exploration coefficient + """ + + def __init__( + self, + user_cols: Dict[str, List[str]] = {"continuous_cols": [], "cat_embed_cols": [], "wide_cols": []}, + item_cols: Dict[str, List[str]] = {"continuous_cols": [], "cat_embed_cols": [], "wide_cols": []}, + embedding_sizes: List[int] = [32, 32, 64], + hidden_layers: List[int] = [32, 20], + wide_out_dim: int = 1, + deep_out_dim: int = 20, + head_dropout: float = 0.8, + deep_dropout: float = 0.4, + dim_head: int = 20, + n_epochs: int = 2, + opt_lr: float = 3e-4, + lr_min: float = 1e-5, + use_gpu: bool = False, + use_warp_loss: bool = True, + cnt_neg_samples: int = 100, + cnt_samples_for_predict: int = 10, + exploration_coef: float = 1.0, + cnt_users: Optional[int] = None, + cnt_items: Optional[int] = None, + plot_dir: Optional[str] = None, + ): + self.user_cols = user_cols + self.item_cols = item_cols + self.embedding_sizes = embedding_sizes + self.hidden_layers = hidden_layers + self.wide_out_dim = wide_out_dim + self.deep_out_dim = deep_out_dim + self.head_dropout = head_dropout + self.deep_dropout = deep_dropout + self.dim_head = dim_head + self.n_epochs = n_epochs + self.opt_lr = opt_lr + self.lr_min = lr_min + self.device = torch.device("cpu") + if use_gpu: + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.use_warp_loss = use_warp_loss + self.cnt_neg_samples = cnt_neg_samples + self.cnt_samples_for_predict = cnt_samples_for_predict + self.exploration_coef = exploration_coef + self.cnt_users = cnt_users + self.cnt_items = cnt_items + self.plot_dir = plot_dir + + self.size_wide_features = None + self.size_continuous_features = None + self.size_cat_features = None + self.scaler_user = None + self.encoder_intersept_user = None + self.encoder_diff_user = None + self.scaler_item = None + self.encoder_intersept_item = None + self.encoder_diff_item = None + self.union_cols = None + self.num_of_train_labels = None + self.dict_true_items_val = None + self.lr_scheduler = None + self.model = None + self.criterion = None + self.optimizer = None + + def preprocess_features_fit(self, train, item_features, user_features): + """ + This function initializes all ecoders for the features. + """ + train_users = user_features.loc[user_features["user_idx"].isin(train["user_idx"].values.tolist())] + wide_cols_cat = list(set(self.user_cols["cat_embed_cols"]) & set(self.user_cols["wide_cols"])) + cat_embed_cols_not_wide = list(set(self.user_cols["cat_embed_cols"]).difference(set(wide_cols_cat))) + if len(self.user_cols["continuous_cols"]) != 0: + self.scaler_user = MinMaxScaler() + self.scaler_user.fit(train_users[self.user_cols["continuous_cols"]]) + else: + self.scaler_user = None + if len(wide_cols_cat) != 0: + self.encoder_intersept_user = OneHotEncoder(sparse=False, handle_unknown="ignore") + self.encoder_intersept_user.fit(train_users[wide_cols_cat]) + else: + self.encoder_intersept_user = None + if len(cat_embed_cols_not_wide) != 0: + self.encoder_diff_user = OneHotEncoder(sparse=False, handle_unknown="ignore") + self.encoder_diff_user.fit(train_users[cat_embed_cols_not_wide]) + else: + self.encoder_diff_user = None + train_items = item_features.loc[item_features["item_idx"].isin(train["item_idx"].values.tolist())] + wide_cols_cat = list(set(self.item_cols["cat_embed_cols"]) & set(self.item_cols["wide_cols"])) + cat_embed_cols_not_wide = list(set(self.item_cols["cat_embed_cols"]).difference(set(wide_cols_cat))) + if len(self.item_cols["continuous_cols"]) != 0: + self.scaler_item = MinMaxScaler() + self.scaler_item.fit(train_items[self.item_cols["continuous_cols"]]) + else: + self.scaler_item = None + if len(wide_cols_cat) != 0: + self.encoder_intersept_item = OneHotEncoder(sparse=False, handle_unknown="ignore") + self.encoder_intersept_item.fit(train_items[wide_cols_cat]) + else: + self.encoder_intersept_item = None + if len(cat_embed_cols_not_wide) != 0: + self.encoder_diff_item = OneHotEncoder(sparse=False, handle_unknown="ignore") + self.encoder_diff_item.fit(train_items[cat_embed_cols_not_wide]) + else: + self.encoder_diff_item = None + + def preprocess_features_transform(self, item_features, user_features): + """ + This function performs the transformation for all features. + """ + self.union_cols = {"continuous_cols": [], "cat_embed_cols": [], "wide_cols": []} + wide_cols_cat = list(set(self.user_cols["cat_embed_cols"]) & set(self.user_cols["wide_cols"])) + cat_embed_cols_not_wide = list(set(self.user_cols["cat_embed_cols"]).difference(set(wide_cols_cat))) + if len(self.user_cols["continuous_cols"]) != 0: + users_continuous = pd.DataFrame( + self.scaler_user.transform(user_features[self.user_cols["continuous_cols"]]), + columns=self.user_cols["continuous_cols"], + ) + self.union_cols["continuous_cols"] += self.user_cols["continuous_cols"] + else: + users_continuous = user_features[[]] + if len(wide_cols_cat) != 0: + users_wide_cat = pd.DataFrame( + self.encoder_intersept_user.transform(user_features[wide_cols_cat]), + columns=list(self.encoder_intersept_user.get_feature_names_out(wide_cols_cat)), + ) + self.union_cols["cat_embed_cols"] += list(self.encoder_intersept_user.get_feature_names_out(wide_cols_cat)) + self.union_cols["wide_cols"] += list( + set(self.user_cols["wide_cols"]).difference(set(self.user_cols["cat_embed_cols"])) + ) + list(self.encoder_intersept_user.get_feature_names_out(wide_cols_cat)) + else: + users_wide_cat = user_features[[]] + if len(cat_embed_cols_not_wide) != 0: + users_cat = pd.DataFrame( + self.encoder_diff_user.transform(user_features[cat_embed_cols_not_wide]), + columns=list(self.encoder_diff_user.get_feature_names_out(cat_embed_cols_not_wide)), + ) + self.union_cols["cat_embed_cols"] += list( + self.encoder_diff_user.get_feature_names_out(cat_embed_cols_not_wide) + ) + else: + users_cat = user_features[[]] + + transform_user_features = pd.concat( + [user_features[["user_idx"]], users_continuous, users_wide_cat, users_cat], axis=1 + ) + + wide_cols_cat = list(set(self.item_cols["cat_embed_cols"]) & set(self.item_cols["wide_cols"])) + cat_embed_cols_not_wide = list(set(self.item_cols["cat_embed_cols"]).difference(set(wide_cols_cat))) + if len(self.item_cols["continuous_cols"]) != 0: + items_continuous = pd.DataFrame( + self.scaler_item.transform(item_features[self.item_cols["continuous_cols"]]), + columns=self.item_cols["continuous_cols"], + ) + self.union_cols["continuous_cols"] += self.item_cols["continuous_cols"] + else: + items_continuous = item_features[[]] + if len(wide_cols_cat) != 0: + items_wide_cat = pd.DataFrame( + self.encoder_intersept_item.transform(item_features[wide_cols_cat]), + columns=list(self.encoder_intersept_item.get_feature_names_out(wide_cols_cat)), + ) + self.union_cols["cat_embed_cols"] += list(self.encoder_intersept_item.get_feature_names_out(wide_cols_cat)) + self.union_cols["wide_cols"] += list( + set(self.item_cols["wide_cols"]).difference(set(self.item_cols["cat_embed_cols"])) + ) + list(self.encoder_intersept_item.get_feature_names_out(wide_cols_cat)) + else: + items_wide_cat = item_features[[]] + if len(cat_embed_cols_not_wide) != 0: + items_cat = pd.DataFrame( + self.encoder_diff_item.transform(item_features[cat_embed_cols_not_wide]), + columns=list(self.encoder_diff_item.get_feature_names_out(cat_embed_cols_not_wide)), + ) + self.union_cols["cat_embed_cols"] += list( + self.encoder_diff_item.get_feature_names_out(cat_embed_cols_not_wide) + ) + else: + items_cat = item_features[[]] + + transform_item_features = pd.concat( + [item_features[["item_idx"]], items_continuous, items_wide_cat, items_cat], axis=1 + ) + return transform_user_features, transform_item_features + + def _data_loader( + self, idx, log_train, transform_user_features, transform_item_features, list_items, train=False + ) -> Union[Tuple[UserDatasetWithReset, DataLoader], DataLoader]: + if train: + train_dataset = UserDatasetWithReset( + idx=idx, + log_train=log_train, + user_features=transform_user_features, + item_features=transform_item_features, + list_items=list_items, + union_cols=self.union_cols, + cnt_neg_samples=self.cnt_neg_samples, + device=self.device, + target="relevance", + ) + sampler = SamplerWithReset(train_dataset) + train_dataloader = DataLoader( + train_dataset, batch_size=log_train.shape[0] + self.cnt_neg_samples, sampler=sampler + ) + return train_dataset, train_dataloader + else: + dataset = UserDatasetWithReset( + idx=idx, + log_train=log_train, + user_features=transform_user_features, + item_features=transform_item_features, + list_items=list_items, + union_cols=self.union_cols, + cnt_neg_samples=None, + device=self.device, + target=None, + ) + dataloader = DataLoader(dataset, batch_size=log_train.shape[0], shuffle=False) + return dataloader + + def _fit( + self, + log: DataFrame, + user_features: Optional[DataFrame] = None, + item_features: Optional[DataFrame] = None, + ) -> None: + if user_features is None: + msg = "User features are missing for fitting" + raise ValueError(msg) + if item_features is None: + msg = "Item features are missing for fitting" + raise ValueError(msg) + + train_spl = TimeSplitter( + time_threshold=0.2, + drop_cold_items=True, + drop_cold_users=True, + query_column="user_idx", + item_column="item_idx", + ) + train, val = train_spl.split(log) + train = train.drop("timestamp") + val = val.drop("timestamp") + + train = train.toPandas() + val = val.toPandas() + pd_item_features = item_features.toPandas() + pd_user_features = user_features.toPandas() + if self.cnt_users is None: + self.cnt_users = pd_user_features.shape[0] + if self.cnt_items is None: + self.cnt_items = pd_item_features.shape[0] + self.num_of_train_labels = self.cnt_items + + self.preprocess_features_fit(train, pd_item_features, pd_user_features) + transform_user_features, transform_item_features = self.preprocess_features_transform( + pd_item_features, pd_user_features + ) + + list_items = pd_item_features["item_idx"].values.tolist() + + dataloader_train_users = [] + train = train.set_axis(range(train.shape[0]), axis="index") + train_group_by_users = train.groupby("user_idx") + for idx, df_train_idx in tqdm(train_group_by_users): + df_train_idx = df_train_idx.loc[df_train_idx["relevance"] == 1] + if df_train_idx.shape[0] == 0: + continue + df_train_idx = df_train_idx.set_axis(range(df_train_idx.shape[0]), axis="index") + train_dataset, train_dataloader = self._data_loader( + idx, df_train_idx, transform_user_features, transform_item_features, list_items, train=True + ) + dataloader_train_users.append(train_dataloader) + + dataloader_val_users = [] + self.dict_true_items_val = {} + transform_item_features.sort_values(by=["item_idx"], inplace=True, ignore_index=True) + val = val.set_axis(range(val.shape[0]), axis="index") + val_group_by_users = val.groupby("user_idx") + for idx, df_val_idx in tqdm(val_group_by_users): + self.dict_true_items_val[idx] = df_val_idx.loc[(df_val_idx["relevance"] == 1)]["item_idx"].values.tolist() + df_val = cartesian_product(pd.DataFrame({"user_idx": [idx]}), transform_item_features[["item_idx"]]) + df_val = df_val.set_axis(range(df_val.shape[0]), axis="index") + dataloader_val_users.append( + self._data_loader( + idx, df_val, transform_user_features, transform_item_features, list_items, train=False + ) + ) + + self.size_wide_features, self.size_continuous_features, self.size_cat_features = ( + train_dataset.get_size_features() + ) + self.model = WideDeep( + dim_head=self.dim_head, + deep_out_dim=self.deep_out_dim, + hidden_layers=self.hidden_layers, + size_wide_features=self.size_wide_features, + size_continuous_features=self.size_continuous_features, + size_cat_features=self.size_cat_features, + wide_out_dim=self.wide_out_dim, + head_dropout=self.head_dropout, + deep_dropout=self.deep_dropout, + cnt_users=self.cnt_users, + cnt_items=self.cnt_items, + user_embed=self.embedding_sizes[0], + item_embed=self.embedding_sizes[1], + crossed_embed=self.embedding_sizes[2], + ) + if self.use_warp_loss: + self.criterion = warp_loss + else: + self.criterion = w_log_loss + self.optimizer = torch.optim.AdamW(self.model.parameters(), self.opt_lr) + self.lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(self.optimizer, self.n_epochs, self.lr_min) + + self.train(self.model, dataloader_train_users, dataloader_val_users) + + def train(self, model, train_dataloader, val_dataloader): + """ + Run training loop. + """ + train_losses = [] + val_ndcg = [] + model = model.to(self.device) + for epoch in range(self.n_epochs): + train_loss = self._batch_pass(model, train_dataloader) + ndcg = self.predict_val_with_ndcg(model, val_dataloader, k=10) + train_losses.append(train_loss) + val_ndcg.append(ndcg) + + if self.plot_dir is not None and epoch > 0: + clear_output(wait=True) + _, (ax1, ax2) = plt.subplots(1, 2, figsize=(30, 15)) + ax1.plot(train_losses, label="train", color="b") + ax2.plot(val_ndcg, label="val_ndcg", color="g") + size = max(1, round(epoch / 10)) + plt.xticks(range(epoch - 1)[::size]) + ax1.set_ylabel("loss") + ax1.set_xlabel("epoch") + ax2.set_ylabel("ndcg") + ax2.set_xlabel("epoch") + plt.legend() + plt.savefig(self.plot_dir) + plt.show() + self.logger.info("ndcg val =%.4f", ndcg) + + def _loss(self, preds, labels): + if self.use_warp_loss: + ind_pos = torch.where(labels == 1)[0] + ind_neg = torch.where(labels == 0)[0] + min_batch = ind_pos.shape[0] + if ind_pos.shape[0] == 0 or ind_neg.shape[0] == 0: + return + indexes_pos = ind_pos + pos = preds.squeeze()[indexes_pos].unsqueeze(-1) + list_neg = [] + for _ in range(min_batch): + indexes_neg = ind_neg[torch.randperm(ind_neg.shape[0])] + list_neg.append(preds.squeeze()[indexes_neg].unsqueeze(-1)) + neg = torch.cat(list_neg, dim=-1) + neg = neg.transpose(0, 1) + loss = self.criterion(pos, neg, self.num_of_train_labels, self.device) + else: + loss = self.criterion(preds.squeeze(), labels, self.device) + return loss + + def _batch_pass(self, model, train_dataloader): + """ + Run training one epoch loop. + """ + model.train() + idx = 0 + cumulative_loss = 0 + preds = None + for user_dataloader in tqdm(train_dataloader): + for batch in user_dataloader: + wide_part, continuous_part, cat_part, users, items, labels = batch + self.optimizer.zero_grad() + preds = model(wide_part, continuous_part, cat_part, users, items) + loss = self._loss(preds, labels) + if loss is not None: + loss.backward() + self.optimizer.step() + cumulative_loss += loss.item() + idx += 1 + + self.lr_scheduler.step() + return cumulative_loss / idx + + def predict_val_with_ndcg(self, model, val_dataloader, k): + """ + This function returns the NDCG metric for the validation data. + """ + if len(val_dataloader) == 0: + return 0 + + ndcg = 0 + idx = 0 + model = model.to(self.device) + for user_dataloader in tqdm(val_dataloader): + _, _, _, users, _, _ = next(iter(user_dataloader)) + user = int(users[0]) + sample_pred = np.array(self.predict_val(model, user_dataloader)) + top_k_predicts = (-sample_pred).argsort()[:k] + ndcg += (np.isin(top_k_predicts, self.dict_true_items_val[user]).sum()) / k + idx += 1 + + metric = ndcg / idx + return metric + + def predict_val(self, model, val_dataloader): + """ + This function returns the relevances for the validation data. + """ + probs = [] + model = model.to(self.device) + model.eval() + with torch.no_grad(): + for wide_part, continuous_part, cat_part, users, items, _ in val_dataloader: + preds = model(wide_part, continuous_part, cat_part, users, items) + probs += (preds.squeeze()).tolist() + return probs + + def predict_test(self, model, test_dataloader, cnt_samples_for_predict): + """ + This function returns a list of cnt_samples_for_predict relevancies for each pair (users, items) + in val_dataloader + """ + probs = [] + model = model.to(self.device) + model.eval() + with torch.no_grad(): + for wide_part, continuous_part, cat_part, users, items, _ in test_dataloader: + preds = model.forward_for_predict(wide_part, continuous_part, cat_part, users, items) + probs.extend(model.forward_dropout(preds).squeeze().tolist() for __ in range(cnt_samples_for_predict)) + return probs + + def _predict( + self, + log: DataFrame, # noqa: ARG002 + k: int, # noqa: ARG002 + users: DataFrame, + items: DataFrame, + user_features: Optional[DataFrame] = None, + item_features: Optional[DataFrame] = None, + filter_seen_items: bool = True, # noqa: ARG002 + ) -> DataFrame: + if user_features is None: + msg = "User features are missing for predict" + raise ValueError(msg) + if item_features is None: + msg = "Item features are missing for predict" + raise ValueError(msg) + + pd_users = users.toPandas() + pd_items = items.toPandas() + pd_user_features = user_features.toPandas() + pd_item_features = item_features.toPandas() + + list_items = pd_item_features["item_idx"].values.tolist() + + transform_user_features, transform_item_features = self.preprocess_features_transform( + pd_item_features, pd_user_features + ) + + preds = [] + users_ans = [] + items_ans = [] + for idx in tqdm(pd_users["user_idx"].unique()): + df_test_idx = cartesian_product(pd.DataFrame({"user_idx": [idx]}), pd_items) + df_test_idx = df_test_idx.set_axis(range(df_test_idx.shape[0]), axis="index") + test_dataloader = self._data_loader( + idx, df_test_idx, transform_user_features, transform_item_features, list_items, train=False + ) + + samples = np.array(self.predict_test(self.model, test_dataloader, self.cnt_samples_for_predict)) + sample_pred = np.mean(samples, axis=0) + self.exploration_coef * np.sqrt(np.var(samples, axis=0)) + + preds += sample_pred.tolist() + users_ans += [idx] * df_test_idx.shape[0] + items_ans += df_test_idx["item_idx"].values.tolist() + + res_df = pd.DataFrame({"user_idx": users_ans, "item_idx": items_ans, "relevance": preds}) + pred = convert2spark(res_df) + return pred + + @property + def _init_args(self): + return { + "n_epochs": self.n_epochs, + "union_cols": self.union_cols, + "cnt_users": self.cnt_users, + "cnt_items": self.cnt_items, + "size_wide_features": self.size_wide_features, + "size_continuous_features": self.size_continuous_features, + "size_cat_features": self.size_cat_features, + } + + def model_save(self, dir_name): + """ + This function saves the model. + """ + os.makedirs(dir_name, exist_ok=True) + + joblib.dump(self.scaler_user, os.path.join(dir_name, "scaler_user.joblib")) + joblib.dump(self.encoder_intersept_user, os.path.join(dir_name, "encoder_intersept_user.joblib")) + joblib.dump(self.encoder_diff_user, os.path.join(dir_name, "encoder_diff_user.joblib")) + + joblib.dump(self.scaler_item, os.path.join(dir_name, "scaler_item.joblib")) + joblib.dump(self.encoder_intersept_item, os.path.join(dir_name, "encoder_intersept_item.joblib")) + joblib.dump(self.encoder_diff_item, os.path.join(dir_name, "encoder_diff_item.joblib")) + + torch.save(self.model.state_dict(), os.path.join(dir_name, "model_weights.pth")) + torch.save( + { + "fit_users": self.fit_users.toPandas(), + "fit_items": self.fit_items.toPandas(), + }, + os.path.join(dir_name, "fit_info.pth"), + ) + + def model_load(self, dir_name): + """ + This function loads the model. + """ + self.scaler_user = joblib.load(os.path.join(dir_name, "scaler_user.joblib")) + self.encoder_intersept_user = joblib.load(os.path.join(dir_name, "encoder_intersept_user.joblib")) + self.encoder_diff_user = joblib.load(os.path.join(dir_name, "encoder_diff_user.joblib")) + + self.scaler_item = joblib.load(os.path.join(dir_name, "scaler_item.joblib")) + self.encoder_intersept_item = joblib.load(os.path.join(dir_name, "encoder_intersept_item.joblib")) + self.encoder_diff_item = joblib.load(os.path.join(dir_name, "encoder_diff_item.joblib")) + + self.model = WideDeep( + dim_head=self.dim_head, + deep_out_dim=self.deep_out_dim, + hidden_layers=self.hidden_layers, + size_wide_features=self.size_wide_features, + size_continuous_features=self.size_continuous_features, + size_cat_features=self.size_cat_features, + wide_out_dim=self.wide_out_dim, + head_dropout=self.head_dropout, + deep_dropout=self.deep_dropout, + cnt_users=self.cnt_users, + cnt_items=self.cnt_items, + user_embed=self.embedding_sizes[0], + item_embed=self.embedding_sizes[1], + crossed_embed=self.embedding_sizes[2], + ) + self.model.load_state_dict(torch.load(os.path.join(dir_name, "model_weights.pth"))) + + checkpoint = torch.load(os.path.join(dir_name, "fit_info.pth")) + self.fit_users = convert2spark(checkpoint["fit_users"]) + self.fit_items = convert2spark(checkpoint["fit_items"]) diff --git a/replay/experimental/scenarios/obp_wrapper/replay_offline.py b/replay/experimental/scenarios/obp_wrapper/replay_offline.py index 47fcc7a36..0403e352d 100644 --- a/replay/experimental/scenarios/obp_wrapper/replay_offline.py +++ b/replay/experimental/scenarios/obp_wrapper/replay_offline.py @@ -1,11 +1,6 @@ import logging from dataclasses import dataclass -from typing import ( - Any, - Dict, - List, - Optional, -) +from typing import Any, Dict, List, Optional, Union import numpy as np import pandas as pd @@ -15,13 +10,16 @@ from pyspark.sql import DataFrame from replay.data import Dataset, FeatureHint, FeatureInfo, FeatureSchema, FeatureType +from replay.experimental.models.base_rec import BaseRecommender as ExperimentalBaseRecommender from replay.experimental.scenarios.obp_wrapper.obp_optuna_objective import OBPObjective from replay.experimental.scenarios.obp_wrapper.utils import split_bandit_feedback from replay.models.base_rec import BaseRecommender from replay.utils.spark_utils import convert2spark -def obp2df(action: np.ndarray, reward: np.ndarray, timestamp: np.ndarray) -> Optional[pd.DataFrame]: +def obp2df( + action: np.ndarray, reward: np.ndarray, timestamp: np.ndarray, feedback_column: str +) -> Optional[pd.DataFrame]: """ Converts OBP log to the pandas DataFrame """ @@ -32,7 +30,7 @@ def obp2df(action: np.ndarray, reward: np.ndarray, timestamp: np.ndarray) -> Opt { "user_idx": np.arange(n_interactions), "item_idx": action, - "rating": reward, + feedback_column: reward, "timestamp": timestamp, } ) @@ -68,7 +66,7 @@ class OBPOfflinePolicyLearner(BaseOfflinePolicyLearner): Constructing inside the fit method. Used for predict of replay_model. """ - replay_model: Optional[BaseRecommender] = None + replay_model: Optional[Union[BaseRecommender, ExperimentalBaseRecommender]] = None log: Optional[DataFrame] = None max_usr_id: int = 0 item_features: DataFrame = None @@ -78,6 +76,13 @@ class OBPOfflinePolicyLearner(BaseOfflinePolicyLearner): def __post_init__(self) -> None: """Initialize Class.""" + + self.is_experimental_model = isinstance(self.replay_model, ExperimentalBaseRecommender) + if self.is_experimental_model: + self.feedback_column = "relevance" + else: + self.feedback_column = "rating" + self.feature_schema = FeatureSchema( [ FeatureInfo( @@ -136,7 +141,7 @@ def fit( :param action_context: Context vectors observed for each action. """ - log = convert2spark(obp2df(action, reward, timestamp)) + log = convert2spark(obp2df(action, reward, timestamp, self.feedback_column)) self.log = log user_features = None @@ -148,13 +153,16 @@ def fit( if action_context is not None: self.item_features = convert2spark(context2df(action_context, np.arange(self.n_actions), "item")) - dataset = Dataset( - feature_schema=self.feature_schema, - interactions=log, - query_features=user_features, - item_features=self.item_features, - ) - self.replay_model._fit_wrap(dataset) + if self.is_experimental_model: + self.replay_model._fit_wrap(log, user_features, self.item_features) + else: + dataset = Dataset( + feature_schema=self.feature_schema, + interactions=log, + query_features=user_features, + item_features=self.item_features, + ) + self.replay_model._fit_wrap(dataset) def predict(self, n_rounds: int = 1, context: np.ndarray = None) -> np.ndarray: """Predict best actions for new data. @@ -178,16 +186,21 @@ def predict(self, n_rounds: int = 1, context: np.ndarray = None) -> np.ndarray: self.max_usr_id += n_rounds - dataset = Dataset( - feature_schema=self.feature_schema, - interactions=self.log, - query_features=user_features, - item_features=self.item_features, - check_consistency=False, - ) - - action_dist = self.replay_model._predict_proba(dataset, self.len_list, users, items, filter_seen_items=False) - + if self.is_experimental_model: + action_dist = self.replay_model._predict_proba( + self.log, self.len_list, users, items, user_features, self.item_features, filter_seen_items=False + ) + else: + dataset = Dataset( + feature_schema=self.feature_schema, + interactions=self.log, + query_features=user_features, + item_features=self.item_features, + check_consistency=False, + ) + action_dist = self.replay_model._predict_proba( + dataset, self.len_list, users, items, filter_seen_items=False + ) return action_dist def optimize( diff --git a/replay/models/base_rec.py b/replay/models/base_rec.py index 1a46c7caa..6de6faff0 100644 --- a/replay/models/base_rec.py +++ b/replay/models/base_rec.py @@ -625,23 +625,21 @@ def _predict_proba( self, dataset: Dataset, k: int, queries: SparkDataFrame, items: SparkDataFrame, filter_seen_items: bool = True ) -> np.ndarray: """ - Inner method where model actually predicts. + Inner method where model actually predicts probability estimates. + + Mainly used in ```OBPOfflinePolicyLearner```. - :param log: historical log of interactions + :param dataset: historical interactions with query/item features ``[user_idx, item_idx, timestamp, rating]`` :param k: number of recommendations for each user - :param users: users to create recommendations for + :param queries: queries to create recommendations for dataframe containing ``[user_idx]`` or ``array-like``; - if ``None``, recommend to all users from ``log`` + if ``None``, recommend to all queries from ``interactions`` :param items: candidate items for recommendations dataframe containing ``[item_idx]`` or ``array-like``; - if ``None``, take all items from ``log``. + if ``None``, take all items from ``interactions``. If it contains new items, ``rating`` for them will be ``0``. - :param user_features: user features - ``[user_idx , timestamp]`` + feature columns - :param item_features: item features - ``[item_idx , timestamp]`` + feature columns - :param filter_seen_items: flag to remove seen items from recommendations based on ``log``. + :param filter_seen_items: flag to remove seen items from recommendations based on ``interactions``. :return: distribution over items for each user with shape ``(n_users, n_items, k)`` where we have probability for each user to choose item at fixed position(top-k). @@ -1644,23 +1642,21 @@ def _predict_proba( self, dataset: Dataset, k: int, queries: SparkDataFrame, items: SparkDataFrame, filter_seen_items: bool = True ) -> np.ndarray: """ - Inner method where model actually predicts. + Inner method where model actually predicts probability estimates. + + Mainly used in ```OBPOfflinePolicyLearner```. - :param log: historical log of interactions + :param dataset: historical interactions with query/item features ``[user_idx, item_idx, timestamp, rating]`` :param k: number of recommendations for each user - :param users: users to create recommendations for + :param queries: queries to create recommendations for dataframe containing ``[user_idx]`` or ``array-like``; - if ``None``, recommend to all users from ``log`` + if ``None``, recommend to all queries from ``interactions`` :param items: candidate items for recommendations dataframe containing ``[item_idx]`` or ``array-like``; - if ``None``, take all items from ``log``. + if ``None``, take all items from ``interactions``. If it contains new items, ``rating`` for them will be ``0``. - :param user_features: user features - ``[user_idx , timestamp]`` + feature columns - :param item_features: item features - ``[item_idx , timestamp]`` + feature columns - :param filter_seen_items: flag to remove seen items from recommendations based on ``log``. + :param filter_seen_items: flag to remove seen items from recommendations based on ``interactions``. :return: distribution over items for each user with shape ``(n_users, n_items, k)`` where we have probability for each user to choose item at fixed position(top-k). diff --git a/tests/experimental/models/test_neural_ts.py b/tests/experimental/models/test_neural_ts.py new file mode 100644 index 000000000..ca50de2ea --- /dev/null +++ b/tests/experimental/models/test_neural_ts.py @@ -0,0 +1,225 @@ +from datetime import datetime + +import numpy as np +import pytest +from obp.dataset import OpenBanditDataset +from obp.ope import DirectMethod, DoublyRobust, InverseProbabilityWeighting, OffPolicyEvaluation + +pyspark = pytest.importorskip("pyspark") +torch = pytest.importorskip("torch") + +from pyspark.sql import functions as sf + +from replay.data import get_schema +from replay.experimental.models import NeuralTS +from replay.experimental.scenarios.obp_wrapper.replay_offline import OBPOfflinePolicyLearner +from replay.experimental.scenarios.obp_wrapper.utils import get_est_rewards_by_reg +from tests.utils import sparkDataFrameEqual + +SEED = 123 + + +@pytest.fixture(scope="module") +def log(spark): + date1 = datetime(2019, 1, 1) + date2 = datetime(2019, 1, 2) + date3 = datetime(2019, 1, 3) + return spark.createDataFrame( + data=[ + [0, 0, date1, 1.0], + [1, 0, date1, 1.0], + [2, 1, date2, 2.0], + [2, 1, date2, 2.0], + [1, 1, date3, 2.0], + [2, 2, date3, 2.0], + [0, 2, date3, 2.0], + ], + schema=get_schema("user_idx", "item_idx", "timestamp", "relevance"), + ) + + +@pytest.fixture(scope="module") +def user_features(spark): + return spark.createDataFrame([(0, 2.0, 5.0), (1, 0.0, -5.0), (2, 4.0, 3.0)]).toDF( + "user_idx", "user_feature_cat", "user_feature_cont" + ) + + +@pytest.fixture(scope="module") +def item_features(spark): + return spark.createDataFrame([(0, 4.0, 5.0), (1, 5.0, 4.0), (2, 1.5, 3.3)]).toDF( + "item_idx", "item_feature_cat", "item_feature_cont" + ) + + +@pytest.fixture(scope="module") +def model(): + model = NeuralTS( + embedding_sizes=[2, 2, 4], + hidden_layers=[2, 5], + wide_out_dim=1, + deep_out_dim=1, + dim_head=1, + n_epochs=1, + use_gpu=False, + cnt_neg_samples=1, + cnt_samples_for_predict=2, + ) + + return model + + +@pytest.fixture(scope="module") +def model_with_features(): + cols_item = { + "continuous_cols": ["item_feature_cont"], + "cat_embed_cols": ["item_feature_cat"], + "wide_cols": ["item_feature_cat", "item_feature_cont"], + } + + cols_user = { + "continuous_cols": ["user_feature_cont"], + "cat_embed_cols": ["user_feature_cat"], + "wide_cols": ["user_feature_cat", "user_feature_cont"], + } + + model_with_features = NeuralTS( + user_cols=cols_user, + item_cols=cols_item, + embedding_sizes=[2, 2, 4], + hidden_layers=[2, 5], + wide_out_dim=1, + deep_out_dim=1, + dim_head=1, + n_epochs=1, + use_gpu=False, + cnt_neg_samples=1, + cnt_samples_for_predict=2, + ) + + return model_with_features + + +@pytest.mark.experimental +def test_equal_preds(model, user_features, item_features, log, tmp_path): + path = (tmp_path / "test").resolve() + model.fit(log, user_features=user_features, item_features=item_features) + torch.manual_seed(SEED) + + base_pred = model.predict(log, 5, user_features=user_features, item_features=item_features) + model.model_save(path) + model.model_load(path) + torch.manual_seed(SEED) + + new_pred = model.predict(log, 5, user_features=user_features, item_features=item_features) + sparkDataFrameEqual(base_pred, new_pred) + + +@pytest.mark.experimental +def test_use_features(model_with_features, user_features, item_features, log): + model_with_features.fit(log, user_features=user_features, item_features=item_features) + + users = user_features.select("user_idx").distinct() + items = item_features.select("item_idx").distinct() + + base_pred = model_with_features._predict( + log, 5, users=users, items=items, user_features=user_features, item_features=item_features + ) + + n_users = users.count() + n_items = items.count() + + assert base_pred.count() == n_users * n_items + + +@pytest.mark.experimental +def test_predict_pairs(log, user_features, item_features, model): + model.fit(log, user_features, item_features) + + pred = model.predict_pairs( + log.filter(sf.col("user_idx") == 0).select("user_idx", "item_idx"), + user_features=user_features, + item_features=item_features, + ) + assert pred.count() == 2 + assert pred.select("user_idx").distinct().collect()[0][0] == 0 + + pred = model.predict_pairs( + log.filter(sf.col("user_idx") == 1).select("user_idx", "item_idx"), + user_features=user_features, + item_features=item_features, + ) + assert pred.count() == 2 + assert pred.select("user_idx").distinct().collect()[0][0] == 1 + + +@pytest.mark.experimental +def test_predict_empty_log(model_with_features, user_features, item_features, log): + model_with_features.fit(log, user_features=user_features, item_features=item_features) + users = user_features.select("user_idx").distinct() + items = item_features.select("item_idx").distinct() + + model_with_features._predict( + log.limit(0), 1, users=users, items=items, user_features=user_features, item_features=item_features + ) + + +@pytest.fixture +def obp_dataset(): + dataset = OpenBanditDataset(behavior_policy="random", data_path=None, campaign="all") + return dataset + + +@pytest.fixture +def obp_learner(obp_dataset): + obp_model = NeuralTS( + embedding_sizes=[2, 2, 4], + hidden_layers=[2, 5], + wide_out_dim=1, + deep_out_dim=1, + dim_head=1, + n_epochs=1, + use_gpu=False, + cnt_neg_samples=1, + cnt_samples_for_predict=2, + cnt_users=obp_dataset.n_rounds, + ) + learner = OBPOfflinePolicyLearner( + n_actions=obp_dataset.n_actions, + replay_model=obp_model, + len_list=obp_dataset.len_list, + ) + return learner + + +def test_obp_evaluation(obp_dataset, obp_learner): + bandit_feedback_train, _ = obp_dataset.obtain_batch_bandit_feedback(test_size=0.3, is_timeseries_split=True) + _, bandit_feedback_test = obp_dataset.obtain_batch_bandit_feedback(test_size=0.3, is_timeseries_split=True) + + obp_learner.fit( + action=bandit_feedback_train["action"], + reward=bandit_feedback_train["reward"], + timestamp=np.arange(bandit_feedback_train["n_rounds"]), + context=bandit_feedback_train["context"], + action_context=bandit_feedback_train["action_context"], + ) + action_dist = obp_learner.predict( + bandit_feedback_test["n_rounds"], + bandit_feedback_test["context"], + ) + assert action_dist.shape == (bandit_feedback_test["n_rounds"], obp_learner.n_actions, obp_learner.len_list) + + ope = OffPolicyEvaluation( + bandit_feedback=bandit_feedback_test, + ope_estimators=[InverseProbabilityWeighting(), DirectMethod(), DoublyRobust()], + ) + estimated_rewards_by_reg_model = get_est_rewards_by_reg( + obp_dataset.n_actions, obp_dataset.len_list, bandit_feedback_test, bandit_feedback_test + ) + estimated_policy_value = ope.estimate_policy_values( + action_dist=action_dist, + estimated_rewards_by_reg_model=estimated_rewards_by_reg_model, + ) + assert "ipw" in estimated_policy_value + assert "dm" in estimated_policy_value + assert "dr" in estimated_policy_value