forked from yuqingcuiyuki/recommendation-system
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpreprocess.py
185 lines (154 loc) · 6.27 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, row_number
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.ml.evaluation import RankingEvaluator
from pyspark.sql.functions import collect_list, lit, array
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id
def main(spark, userID):
# load data: train
train = spark.read.parquet("/user/yc6285_nyu_edu/interactions_train.parquet") #switch to validation and test to get "train_mat"
print('load train')
# Create a temporary view of the DataFrame
train.createOrReplaceTempView("train_view")
# load data: val
val = spark.read.parquet("/user/yc6285_nyu_edu/interactions_validation.parquet")
# Create a temporary view of the DataFrame
val.createOrReplaceTempView("val_view")
# load data: val
test = spark.read.parquet("/user/bm106_nyu_edu/1004-project-2023/interactions_test.parquet")
# Create a temporary view of the DataFrame
test.createOrReplaceTempView("test_view")
# create msid map
spark.sql("""
SELECT recording_msid,
CAST(ROW_NUMBER() OVER (ORDER BY recording_msid) AS DOUBLE) AS rank
FROM (
SELECT DISTINCT recording_msid
FROM train_view
UNION
SELECT DISTINCT recording_msid
FROM val_view
UNION
SELECT DISTINCT recording_msid
FROM test_view
) combined_views
""").createOrReplaceTempView("msid_mapping")
################################################################
# create "rating" matrix
print('create rating matrix for train')
train_mat=spark.sql("""
SELECT t.user_id, rank as recording_msid, rating
FROM
(SELECT user_id, recording_msid, count(*) AS rating
FROM train_view
GROUP BY 1,2) AS t
LEFT JOIN msid_mapping as m
on t.recording_msid=m.recording_msid
""")
train_mat.show(5)
# export train matrix as parquet
train_mat.write.format("parquet").save("train_mat.parquet")
print("train mat exported") #val_mat, test_mat
#################################################################
#################################################################
# calculate truth from validation set
print('get truth from val')
print('step1')
spark.sql("""
SELECT user_id, recording_msid, COUNT(*) AS count
FROM val_view
GROUP BY user_id, recording_msid
""").createOrReplaceTempView("step1")
print('step2')
spark.sql("""
SELECT user_id, recording_msid,
DENSE_RANK() OVER (PARTITION BY user_id ORDER BY count DESC) AS count_rank
FROM step1
""").createOrReplaceTempView("step2")
print('step3')
spark.sql("""
SELECT user_id, recording_msid
FROM step2
WHERE count_rank <=100
ORDER BY user_id ASC, count_rank ASC
""").createOrReplaceTempView("step3")
top_100_truth = spark.sql("""
SELECT user_id, rank as recording_msid
FROM step3 as s
LEFT JOIN msid_mapping as m
ON s.recording_msid=m.recording_msid
""")
print('done')
# convert truth from val to format ready to use metric
# Group the top_100_truth DataFrame by user_id and collect the recording_msids as lists
print('convert truth')
truth_df = (
top_100_truth.groupBy("user_id")
.agg(collect_list("recording_msid").alias("label"))
)
truth_df.show(5)
# export truth df from validation set as parquet
truth_df.write.format("parquet").save("val_truth.parquet")
print("val truth exported")
#################################################################
#################################################################
# calculate truth from test set
print('get truth from test')
print('step1')
spark.sql("""
SELECT user_id, recording_msid, COUNT(*) AS count
FROM test_view
GROUP BY user_id, recording_msid
""").createOrReplaceTempView("step1")
print('step2')
spark.sql("""
SELECT user_id, recording_msid,
DENSE_RANK() OVER (PARTITION BY user_id ORDER BY count DESC) AS count_rank
FROM step1
""").createOrReplaceTempView("step2")
print('step3')
spark.sql("""
SELECT user_id, recording_msid
FROM step2
WHERE count_rank <=100
ORDER BY user_id ASC, count_rank ASC
""").createOrReplaceTempView("step3")
top_100_truth = spark.sql("""
SELECT user_id, rank as recording_msid
FROM step3 as s
LEFT JOIN msid_mapping as m
ON s.recording_msid=m.recording_msid
""")
print('done')
# convert truth from test to format ready to use metric
# Group the top_100_truth DataFrame by user_id and collect the recording_msids as lists
print('convert truth')
truth_df = (
top_100_truth.groupBy("user_id")
.agg(collect_list("recording_msid").alias("label"))
)
truth_df.show(5)
# export truth df from test set as parquet
truth_df.write.format("parquet").save("test_truth.parquet")
print("test truth exported")
#################################################################
# Only enter this block if we're in main
if __name__ == "__main__":
# Create the spark session object
config = SparkConf().setAll([('spark.executor.memory', '8g'), \
('spark.driver.memory', '8g'), \
('spark.blacklist.enabled', False), \
('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'), \
('spark.sql.autoBroadcastJoinThreshold', 100 * 1024 * 1024)])
# Create the spark session object
spark = SparkSession.builder.appName('preprocess').getOrCreate()
# Get user userID from the command line
# We need this to access the user's folder in HDFS
userID = os.environ['USER']
# create spark context
sc = spark.sparkContext
# Call our main routine
main(spark, userID)