-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from netesy/dev
Version 0.0.1: Initial Release
- Loading branch information
Showing
14 changed files
with
601 additions
and
254 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,153 @@ | ||
from caidin.models.recommendation_engine import RecommendationEngine | ||
import math | ||
from collections import defaultdict | ||
from caidin.algorithms.recommendation_engine import RecommendationEngine | ||
|
||
|
||
class CollaborativeFiltering(RecommendationEngine): | ||
def __init__(self): | ||
super().__init__() | ||
def __init__(self, data, records, filters): | ||
self.data = data | ||
self.records = records | ||
self.filters = filters | ||
self.user_field = None | ||
self.item_field = None | ||
self.rating_field = None | ||
self.user_item_matrix = None | ||
self.user_similarity_matrix = None | ||
|
||
# def users(self): | ||
# return self | ||
def recommend(self): | ||
# Check if user_field, item_field, and rating_field are specified | ||
if not all( | ||
field in self.filters | ||
for field in ["user_field", "item_field", "rating_field"] | ||
): | ||
raise ValueError( | ||
"Please specify 'user_field', 'item_field', and 'rating_field' using the 'where' method." | ||
) | ||
|
||
# def items(self): | ||
# return self | ||
# Filter records based on user-defined criteria | ||
filtered_records = self.records | ||
for category, value in self.filters.items(): | ||
if category in filtered_records and filtered_records[category] == value: | ||
continue | ||
else: | ||
return [] # No recommendations if filters don't match | ||
|
||
def load(self, data): | ||
for user, item_ratings in data.items(): | ||
self.add_item(user, item_ratings) | ||
# Build user-item matrix and user similarity matrix | ||
self.build_user_item_matrix() | ||
|
||
return self | ||
if not self.user_field or not self.item_field or not self.rating_field: | ||
raise ValueError( | ||
"Please specify 'user_field', 'item_field', and 'rating_field' using the 'where' method." | ||
) | ||
|
||
self.build_user_similarity_matrix() | ||
|
||
# Get the index of the user that matches the given user_id | ||
user_idx = self.data[ | ||
self.data[self.user_field] == self.filters["user_id"] | ||
].index[0] | ||
|
||
# Calculate recommendations based on user similarities | ||
recommendations = self.calculate_recommendations(user_idx) | ||
|
||
def train(self): | ||
# Calculate similarity matrix based on ratings | ||
self.similarity_matrix = {} | ||
for item1 in self.items: | ||
self.similarity_matrix[item1.item_id] = {} | ||
for item2 in self.items: | ||
similarity = self.calculate_similarity(item1, item2) | ||
self.similarity_matrix[item1.item_id][item2.item_id] = similarity | ||
|
||
def get(self, user, num_recommendations=5): | ||
recommendations = self.recommend(user, num_recommendations) | ||
return recommendations | ||
|
||
def where(self, user_field=None, item_field=None, rating_field=None, **kwargs): | ||
# Set user_field, item_field, and rating_field using the where method | ||
if user_field: | ||
self.user_field = user_field | ||
if item_field: | ||
self.item_field = item_field | ||
if rating_field: | ||
self.rating_field = rating_field | ||
return self | ||
|
||
def build_user_item_matrix(self): | ||
# Build user-item matrix from data | ||
user_item_matrix = {} | ||
|
||
for _, row in self.records.iterrows(): | ||
user = row[self.user_field] | ||
item = row[self.item_field] | ||
rating = row[self.rating_field] | ||
|
||
if user not in user_item_matrix: | ||
user_item_matrix[user] = {} | ||
|
||
user_item_matrix[user][item] = rating | ||
|
||
self.user_item_matrix = user_item_matrix | ||
|
||
def build_user_similarity_matrix(self): | ||
# Build user similarity matrix using Pearson correlation coefficient | ||
user_similarity_matrix = {} | ||
|
||
for user1 in self.user_item_matrix.keys(): | ||
user_similarity_matrix[user1] = {} | ||
for user2 in self.user_item_matrix.keys(): | ||
if user1 == user2: | ||
continue | ||
|
||
similarity = self.calculate_user_similarity(user1, user2) | ||
user_similarity_matrix[user1][user2] = similarity | ||
|
||
self.user_similarity_matrix = user_similarity_matrix | ||
|
||
def calculate_user_similarity(self, user1, user2): | ||
# Calculate similarity between two users using Pearson correlation coefficient | ||
shared_items = set(self.user_item_matrix[user1].keys()) & set( | ||
self.user_item_matrix[user2].keys() | ||
) | ||
|
||
if not shared_items: | ||
return 0 # Users have no shared items | ||
|
||
ratings_user1 = [self.user_item_matrix[user1][item] for item in shared_items] | ||
ratings_user2 = [self.user_item_matrix[user2][item] for item in shared_items] | ||
|
||
mean_user1 = sum(ratings_user1) / len(ratings_user1) | ||
mean_user2 = sum(ratings_user2) / len(ratings_user2) | ||
|
||
numerator = sum( | ||
(x - mean_user1) * (y - mean_user2) | ||
for x, y in zip(ratings_user1, ratings_user2) | ||
) | ||
denominator_user1 = sum((x - mean_user1) ** 2 for x in ratings_user1) | ||
denominator_user2 = sum((y - mean_user2) ** 2 for y in ratings_user2) | ||
|
||
if denominator_user1 == 0 or denominator_user2 == 0: | ||
return 0 # Users have no variance | ||
|
||
similarity = numerator / math.sqrt(denominator_user1 * denominator_user2) | ||
return similarity | ||
|
||
def calculate_recommendations(self, user_idx): | ||
# Calculate item recommendations for the user | ||
user = self.data.iloc[user_idx][self.user_field] | ||
user_ratings = self.user_item_matrix.get(user, {}) | ||
|
||
recommendations = [] | ||
|
||
for item in self.user_item_matrix.keys(): | ||
if item not in user_ratings: | ||
weighted_sum = 0 | ||
similarity_sum = 0 | ||
|
||
for other_user in self.user_item_matrix.keys(): | ||
if other_user == user: | ||
continue | ||
|
||
similarity = self.user_similarity_matrix[user][other_user] | ||
rating = self.user_item_matrix[other_user].get(item, 0) | ||
|
||
weighted_sum += similarity * rating | ||
similarity_sum += abs(similarity) | ||
|
||
if similarity_sum > 0: | ||
predicted_rating = weighted_sum / similarity_sum | ||
recommendations.append((item, predicted_rating)) | ||
|
||
recommendations.sort(key=lambda x: x[1], reverse=True) | ||
recommended_items = [item[0] for item in recommendations] | ||
|
||
return recommended_items |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,110 @@ | ||
from caidin.models.recommendation_engine import RecommendationEngine | ||
import math | ||
from collections import defaultdict | ||
from caidin.algorithms.recommendation_engine import RecommendationEngine | ||
|
||
|
||
class ContentBased(RecommendationEngine): | ||
def __init__(self): | ||
super().__init__() | ||
def __init__(self, data, records, filters, content_field="content"): | ||
self.data = data | ||
self.records = records | ||
self.filters = filters | ||
self.content_field = content_field | ||
self.tfidf_matrix = None | ||
self.idf_dict = {} | ||
self.item_index = {} | ||
self.item_content = [] | ||
|
||
# def items(self): | ||
# return self | ||
def recommend(self): | ||
# Filter records based on user-defined criteria | ||
filtered_records = self.records | ||
for category, value in self.filters.items(): | ||
if category in filtered_records and filtered_records[category] == value: | ||
continue | ||
else: | ||
return [] # No recommendations if filters don't match | ||
|
||
def load(self, data): | ||
for item, item_features in data.items(): | ||
self.add_item(item, item_features) | ||
# Calculate content similarity using TF-IDF | ||
self.calculate_tfidf_matrix() | ||
|
||
return self | ||
# Get the index of the item that matches the given item_id | ||
item_idx = self.item_index.get(self.filters["item_id"]) | ||
if item_idx is None: | ||
return [] # Item not found | ||
|
||
def train(self): | ||
# Calculate feature similarity matrix | ||
self.similarity_matrix = {} | ||
for item1 in self.items: | ||
self.similarity_matrix[item1.item_id] = {} | ||
for item2 in self.items: | ||
similarity = self.calculate_similarity(item1, item2) | ||
self.similarity_matrix[item1.item_id][item2.item_id] = similarity | ||
# Calculate cosine similarity between items | ||
sim_scores = self.calculate_cosine_similarity(item_idx) | ||
|
||
# Sort the items by similarity scores | ||
sim_scores = sorted(enumerate(sim_scores), key=lambda x: x[1], reverse=True) | ||
|
||
# Get the top N similar items (excluding the input item) | ||
top_similar_items = sim_scores[ | ||
1:11 | ||
] # Assuming you want the top 10 recommendations | ||
|
||
# Extract the item indices | ||
recommended_item_indices = [i[0] for i in top_similar_items] | ||
|
||
# Get the recommended items | ||
recommendations = [self.data[i] for i in recommended_item_indices] | ||
|
||
def get(self, item, num_recommendations=5): | ||
recommendations = self.recommend(item, num_recommendations) | ||
return recommendations | ||
|
||
def calculate_tfidf_matrix(self): | ||
# Calculate TF-IDF matrix for content-based recommendation | ||
num_items = len(self.data) | ||
self.tfidf_matrix = [] | ||
|
||
for i, item in enumerate(self.data): | ||
content = item.get(self.content_field, "").lower() | ||
words = content.split() | ||
word_count = defaultdict(int) | ||
|
||
for word in words: | ||
word_count[word] += 1 | ||
|
||
tfidf_vector = {} | ||
for word, count in word_count.items(): | ||
tf = 0.5 + 0.5 * (count / max(word_count.values())) | ||
idf = self.idf_dict.get(word, 0) | ||
tfidf = tf * idf | ||
tfidf_vector[word] = tfidf | ||
|
||
self.tfidf_matrix.append(tfidf_vector) | ||
self.item_index[item["item_id"]] = i | ||
self.item_content.append(set(words)) | ||
|
||
for word in set(words): | ||
self.idf_dict[word] = self.idf_dict.get(word, 0) + 1 | ||
|
||
for word, idf in self.idf_dict.items(): | ||
self.idf_dict[word] = math.log(num_items / (1 + idf)) | ||
|
||
for i in range(num_items): | ||
for word, tfidf in self.tfidf_matrix[i].items(): | ||
self.tfidf_matrix[i][word] = tfidf * self.idf_dict[word] | ||
|
||
def calculate_cosine_similarity(self, item_idx): | ||
# Calculate cosine similarity between items | ||
cosine_sim = [] | ||
tfidf_item = self.tfidf_matrix[item_idx] | ||
norm_item = sum(tfidf**2 for tfidf in tfidf_item.values()) | ||
|
||
for i in range(len(self.tfidf_matrix)): | ||
if i == item_idx: | ||
cosine_sim.append(0) | ||
else: | ||
tfidf_other = self.tfidf_matrix[i] | ||
dot_product = sum( | ||
tfidf_item[word] * tfidf_other.get(word, 0) | ||
for word in tfidf_item.keys() | ||
) | ||
norm_other = sum(tfidf**2 for tfidf in tfidf_other.values()) | ||
similarity = ( | ||
dot_product / (math.sqrt(norm_item) * math.sqrt(norm_other)) | ||
if norm_item > 0 and norm_other > 0 | ||
else 0 | ||
) | ||
cosine_sim.append(similarity) | ||
|
||
return cosine_sim |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
class RecommendationEngine: | ||
def __init__(self, data, records, filters): | ||
self.data = data | ||
self.records = records | ||
self.filters = filters | ||
|
||
def recommend(self): | ||
# Common logic for recommendation methods | ||
pass |
Oops, something went wrong.