Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Spark item to item recommenders #1809

Merged
merged 8 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 146 additions & 41 deletions contrib/sarplus/python/pysarplus/SARPlus.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pysarplus import SARModel
import pyspark.sql.functions as F


SIM_COOCCUR = "cooccurrence"
Expand Down Expand Up @@ -84,6 +85,7 @@ def __init__(
self.similarity_type = similarity_type
self.timedecay_formula = timedecay_formula
self.item_similarity = None
self.item_frequencies = None
self.cache_path = cache_path

def _format(self, string, **kwargs):
Expand Down Expand Up @@ -116,13 +118,16 @@ def fit(self, df):
# following is the query which we want to run

if self.header["time_now"] is None:
query = self._format("""
query = self._format(
"""
SELECT CAST(MAX(`{col_timestamp}`) AS long)
FROM `{prefix}df_train_input`
""")
"""
)
self.header["time_now"] = self.spark.sql(query).first()[0]

query = self._format("""
query = self._format(
"""
SELECT `{col_user}`,
`{col_item}`,
SUM(
Expand All @@ -132,14 +137,16 @@ def fit(self, df):
FROM `{prefix}df_train_input`
GROUP BY `{col_user}`, `{col_item}`
CLUSTER BY `{col_user}`
""")
"""
)

# replace with time-decayed version
df = self.spark.sql(query)
else:
if self.header["col_timestamp"] in df.columns:
# we need to de-duplicate items by using the latest item
query = self._format("""
query = self._format(
"""
SELECT `{col_user}`, `{col_item}`, `{col_rating}`
FROM (
SELECT `{col_user}`,
Expand All @@ -152,7 +159,8 @@ def fit(self, df):
WINDOW user_item_win AS (
PARTITION BY `{col_user}`,`{col_item}`
ORDER BY `{col_timestamp}` DESC)
""")
"""
)

df = self.spark.sql(query)

Expand All @@ -161,7 +169,8 @@ def fit(self, df):
log.info("sarplus.fit 1/2: compute item cooccurrences...")

# compute cooccurrence above minimum threshold
query = self._format("""
query = self._format(
"""
SELECT a.`{col_item}` AS i1,
b.`{col_item}` AS i2,
COUNT(*) AS value
Expand All @@ -171,42 +180,54 @@ def fit(self, df):
GROUP BY i1, i2
HAVING value >= {threshold}
CLUSTER BY i1, i2
""")
"""
)

item_cooccurrence = self.spark.sql(query)
item_cooccurrence.write.mode("overwrite").saveAsTable(
self._format("{prefix}item_cooccurrence")
)

# compute item frequencies
self.item_frequencies = item_cooccurrence.filter(
F.col("i1") == F.col("i2")
).select(F.col("i1").alias("item_id"), F.col("value").alias("frequency"))

Comment on lines +191 to +195
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question to @simonzhaoms will the item-item function have any relationship with the work you are doing about the new similarities?

# compute the diagonal used later for Jaccard and Lift
if self.similarity_type == SIM_LIFT or self.similarity_type == SIM_JACCARD:
query = self._format("""
query = self._format(
"""
SELECT i1 AS i, value AS margin
FROM `{prefix}item_cooccurrence`
WHERE i1 = i2
""")
"""
)
item_marginal = self.spark.sql(query)
item_marginal.createOrReplaceTempView(self._format("{prefix}item_marginal"))

if self.similarity_type == SIM_COOCCUR:
self.item_similarity = item_cooccurrence
elif self.similarity_type == SIM_JACCARD:
query = self._format("""
query = self._format(
"""
SELECT i1, i2, value / (m1.margin + m2.margin - value) AS value
FROM `{prefix}item_cooccurrence` AS a
INNER JOIN `{prefix}item_marginal` AS m1 ON a.i1 = m1.i
INNER JOIN `{prefix}item_marginal` AS m2 ON a.i2 = m2.i
CLUSTER BY i1, i2
""")
"""
)
self.item_similarity = self.spark.sql(query)
elif self.similarity_type == SIM_LIFT:
query = self._format("""
query = self._format(
"""
SELECT i1, i2, value / (m1.margin * m2.margin) AS value
FROM `{prefix}item_cooccurrence` AS a
INNER JOIN `{prefix}item_marginal` AS m1 ON a.i1 = m1.i
INNER JOIN `{prefix}item_marginal` AS m2 ON a.i2 = m2.i
CLUSTER BY i1, i2
""")
"""
)
self.item_similarity = self.spark.sql(query)
else:
raise ValueError(
Expand All @@ -223,7 +244,8 @@ def fit(self, df):

# expand upper triangular to full matrix

query = self._format("""
query = self._format(
"""
SELECT i1, i2, value
FROM (
(
Expand All @@ -238,7 +260,8 @@ def fit(self, df):
)
)
CLUSTER BY i1
""")
"""
)

self.item_similarity = self.spark.sql(query)
self.item_similarity.write.mode("overwrite").saveAsTable(
Expand All @@ -249,9 +272,7 @@ def fit(self, df):
self.spark.sql(self._format("DROP TABLE `{prefix}item_cooccurrence`"))
self.spark.sql(self._format("DROP TABLE `{prefix}item_similarity_upper`"))

self.item_similarity = self.spark.table(
self._format("{prefix}item_similarity")
)
self.item_similarity = self.spark.table(self._format("{prefix}item_similarity"))

def get_user_affinity(self, test):
"""Prepare test set for C++ SAR prediction code.
Expand All @@ -262,18 +283,21 @@ def get_user_affinity(self, test):
"""
test.createOrReplaceTempView(self._format("{prefix}df_test"))

query = self._format("""
query = self._format(
"""
SELECT DISTINCT `{col_user}`
FROM `{prefix}df_test`
CLUSTER BY `{col_user}`
""")
"""
)

df_test_users = self.spark.sql(query)
df_test_users.write.mode("overwrite").saveAsTable(
self._format("{prefix}df_test_users")
)

query = self._format("""
query = self._format(
"""
SELECT a.`{col_user}`,
a.`{col_item}`,
CAST(a.`{col_rating}` AS double) AS `{col_rating}`
Expand All @@ -282,7 +306,8 @@ def get_user_affinity(self, test):
ON a.`{col_user}` = b.`{col_user}`
DISTRIBUTE BY `{col_user}`
SORT BY `{col_user}`, `{col_item}`
""")
"""
)

return self.spark.sql(query)

Expand All @@ -298,26 +323,30 @@ def _recommend_k_items_fast(

# create item id to continuous index mapping
log.info("sarplus.recommend_k_items 1/3: create item index")
query = self._format("""
query = self._format(
"""
SELECT i1, ROW_NUMBER() OVER(ORDER BY i1)-1 AS idx
FROM (
SELECT DISTINCT i1
FROM `{prefix}item_similarity`
)
CLUSTER BY i1
""")
"""
)
self.spark.sql(query).write.mode("overwrite").saveAsTable(
self._format("{prefix}item_mapping")
)

# map similarity matrix into index space
query = self._format("""
query = self._format(
"""
SELECT a.idx AS i1, b.idx AS i2, is.value
FROM `{prefix}item_similarity` AS is,
`{prefix}item_mapping` AS a,
`{prefix}item_mapping` AS b
WHERE is.i1 = a.i1 AND i2 = b.i1
""")
"""
)
self.spark.sql(query).write.mode("overwrite").saveAsTable(
self._format("{prefix}item_similarity_mapped")
)
Expand All @@ -336,21 +365,24 @@ def _recommend_k_items_fast(
# export similarity matrix for C++ backed UDF
log.info("sarplus.recommend_k_items 2/3: prepare similarity matrix")

query = self._format("""
query = self._format(
"""
SELECT i1, i2, CAST(value AS DOUBLE) AS value
FROM `{prefix}item_similarity_mapped`
ORDER BY i1, i2
""")
self.spark.sql(query).coalesce(1).write.format(
"com.microsoft.sarplus"
).mode("overwrite").save(cache_path_output)
"""
)
self.spark.sql(query).coalesce(1).write.format("com.microsoft.sarplus").mode(
"overwrite"
).save(cache_path_output)

self.get_user_affinity(test).createOrReplaceTempView(
self._format("{prefix}user_affinity")
)

# map item ids to index space
query = self._format("""
query = self._format(
"""
SELECT `{col_user}`, idx, rating
FROM (
SELECT `{col_user}`, b.idx, `{col_rating}` AS rating
Expand All @@ -359,15 +391,14 @@ def _recommend_k_items_fast(
ON `{col_item}` = b.i1
)
CLUSTER BY `{col_user}`
""")
"""
)
pred_input = self.spark.sql(query)

schema = StructType(
[
StructField(
"userID",
pred_input.schema[self.header["col_user"]].dataType,
True
"userID", pred_input.schema[self.header["col_user"]].dataType, True
),
StructField("itemID", IntegerType(), True),
StructField("score", FloatType(), True),
Expand Down Expand Up @@ -410,11 +441,13 @@ def sar_predict_udf(df):

df_preds.createOrReplaceTempView(self._format("{prefix}predictions"))

query = self._format("""
query = self._format(
"""
SELECT userID AS `{col_user}`, b.i1 AS `{col_item}`, score
FROM `{prefix}predictions` AS p, `{prefix}item_mapping` AS b
WHERE p.itemID = b.idx
""")
"""
)
return self.spark.sql(query)

def _recommend_k_items_slow(self, test, top_k=10, remove_seen=True):
Expand All @@ -437,7 +470,8 @@ def _recommend_k_items_slow(self, test, top_k=10, remove_seen=True):

# user_affinity * item_similarity
# filter top-k
query = self._format("""
query = self._format(
"""
SELECT `{col_user}`, `{col_item}`, score
FROM (
SELECT df.`{col_user}`,
Expand All @@ -453,7 +487,9 @@ def _recommend_k_items_slow(self, test, top_k=10, remove_seen=True):
ORDER BY SUM(df.`{col_rating}` * s.value) DESC)
)
WHERE rank <= {top_k}
""", top_k=top_k)
""",
top_k=top_k,
)

return self.spark.sql(query)

Expand Down Expand Up @@ -488,3 +524,72 @@ def recommend_k_items(
)
else:
raise ValueError("No cache_path specified")

def get_topk_most_similar_users(self, test, user, top_k=10):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question, have you checked that the results between the CPU version and this one are the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are using the same test cases as for the CPU version.

"""Based on user affinity towards items, calculate the top k most
similar users from test dataframe to the given user.

Args:
test (pyspark.sql.DataFrame): test Spark dataframe.
user (int): user to retrieve most similar users for.
top_k (int): number of top items to recommend.

Returns:
pyspark.sql.DataFrame: Spark dataframe with top k most similar users
from test and their similarity scores in descending order.
"""

if len(test.filter(test["user_id"].contains(user)).collect()) == 0:
raise ValueError("Target user must exist in the input dataframe")

test_affinity = self.get_user_affinity(test).alias("matrix")
num_test_users = test_affinity.select("user_id").distinct().count() - 1

if num_test_users < top_k:
log.warning(
"Number of users is less than top_k, limiting top_k to number of users"
)
k = min(top_k, num_test_users)

user_affinity = test_affinity.where(F.col("user_id") == user).alias("user")

df_similar_users = (
test_affinity.join(
user_affinity,
test_affinity["item_id"] == user_affinity["item_id"],
"outer",
)
.withColumn(
"prod",
F.when(F.col("matrix.user_id") == user, -float("inf"))
.when(
F.col("user.rating").isNotNull(),
F.col("matrix.rating") * F.col("user.rating"),
)
.otherwise(0.0),
)
.groupBy("matrix.user_id")
.agg(F.sum("prod").alias("similarity"))
.orderBy("similarity", ascending=False)
.limit(k)
)

return df_similar_users

def get_popularity_based_topk(self, top_k=10, items=True):
"""Get top K most frequently occurring items across all users.

Args:
top_k (int): number of top items to recommend.
items (bool): if false, return most frequent users instead.

Returns:
pyspark.sql.DataFrame: Spark dataframe with top k most popular items
and their frequencies in descending order.
"""

# TODO: get most frequent users
if not items:
raise ValueError("Not implemented")

return self.item_frequencies.orderBy("frequency", ascending=False).limit(top_k)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that the sarplus tests are triggered, but there is a badge in the main page that is failing: https://github.com/microsoft/recommenders/tree/staging/contrib/sarplus @simonzhaoms do you know what is the problem?

Loading