Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement LLM-based recommenders #941

Merged
merged 6 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions logics/item_to_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ItemToItemOptions struct {
type ItemToItem interface {
Items() []*data.Item
Push(item *data.Item, feedback []dataset.ID)
PopAll(callback func(itemId string, score []cache.Score))
PopAll(i int) []cache.Score
}

func NewItemToItem(cfg config.ItemToItemConfig, n int, timestamp time.Time, opts *ItemToItemOptions) (ItemToItem, error) {
Expand Down Expand Up @@ -82,22 +82,20 @@ func (b *baseItemToItem[T]) Items() []*data.Item {
return b.items
}

func (b *baseItemToItem[T]) PopAll(callback func(itemId string, score []cache.Score)) {
for index, item := range b.items {
scores, err := b.index.SearchIndex(index, b.n+1, true)
if err != nil {
log.Logger().Error("failed to search index", zap.Error(err))
return
}
callback(item.ItemId, lo.Map(scores, func(v lo.Tuple2[int, float32], _ int) cache.Score {
return cache.Score{
Id: b.items[v.A].ItemId,
Categories: b.items[v.A].Categories,
Score: -float64(v.B),
Timestamp: b.timestamp,
}
}))
func (b *baseItemToItem[T]) PopAll(i int) []cache.Score {
scores, err := b.index.SearchIndex(i, b.n+1, true)
if err != nil {
log.Logger().Error("failed to search index", zap.Error(err))
return nil
}
return lo.Map(scores, func(v lo.Tuple2[int, float32], _ int) cache.Score {
return cache.Score{
Id: b.items[v.A].ItemId,
Categories: b.items[v.A].Categories,
Score: -float64(v.B),
Timestamp: b.timestamp,
}
})
}

type embeddingItemToItem struct {
Expand Down
32 changes: 5 additions & 27 deletions logics/item_to_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/zhenghaoz/gorse/config"
"github.com/zhenghaoz/gorse/dataset"
"github.com/zhenghaoz/gorse/storage/cache"
"github.com/zhenghaoz/gorse/storage/data"
)

Expand Down Expand Up @@ -97,12 +96,7 @@ func (suite *ItemToItemTestSuite) TestEmbedding() {
}, nil)
}

var scores []cache.Score
item2item.PopAll(func(itemId string, score []cache.Score) {
if itemId == "0" {
scores = score
}
})
scores := item2item.PopAll(0)
suite.Len(scores, 10)
for i := 1; i <= 10; i++ {
suite.Equal(strconv.Itoa(i), scores[i-1].Id)
Expand Down Expand Up @@ -131,12 +125,7 @@ func (suite *ItemToItemTestSuite) TestTags() {
}, nil)
}

var scores []cache.Score
item2item.PopAll(func(itemId string, score []cache.Score) {
if itemId == "0" {
scores = score
}
})
scores := item2item.PopAll(0)
suite.Len(scores, 10)
for i := 1; i <= 10; i++ {
suite.Equal(strconv.Itoa(i), scores[i-1].Id)
Expand All @@ -160,12 +149,7 @@ func (suite *ItemToItemTestSuite) TestUsers() {
item2item.Push(&data.Item{ItemId: strconv.Itoa(i)}, feedback)
}

var scores []cache.Score
item2item.PopAll(func(itemId string, score []cache.Score) {
if itemId == "0" {
scores = score
}
})
scores := item2item.PopAll(0)
suite.Len(scores, 10)
for i := 1; i <= 10; i++ {
suite.Equal(strconv.Itoa(i), scores[i-1].Id)
Expand Down Expand Up @@ -198,18 +182,12 @@ func (suite *ItemToItemTestSuite) TestAuto() {
item2item.Push(item, feedback)
}

var scores0, scores1 []cache.Score
item2item.PopAll(func(itemId string, score []cache.Score) {
if itemId == "0" {
scores0 = score
} else if itemId == "1" {
scores1 = score
}
})
scores0 := item2item.PopAll(0)
suite.Len(scores0, 10)
for i := 1; i <= 10; i++ {
suite.Equal(strconv.Itoa(i*2), scores0[i-1].Id)
}
scores1 := item2item.PopAll(1)
suite.Len(scores1, 10)
for i := 1; i <= 10; i++ {
suite.Equal(strconv.Itoa(i*2+1), scores1[i-1].Id)
Expand Down
28 changes: 13 additions & 15 deletions logics/user_to_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type UserToUserOptions struct {
type UserToUser interface {
Users() []*data.User
Push(user *data.User, feedback []dataset.ID)
PopAll(callback func(userId string, score []cache.Score))
PopAll(i int) []cache.Score
}

func NewUserToUser(cfg UserToUserConfig, n int, timestamp time.Time, opts *UserToUserOptions) (UserToUser, error) {
Expand Down Expand Up @@ -82,21 +82,19 @@ func (b *baseUserToUser[T]) Users() []*data.User {
return b.users
}

func (b *baseUserToUser[T]) PopAll(callback func(userId string, score []cache.Score)) {
for index, user := range b.users {
scores, err := b.index.SearchIndex(index, b.n+1, true)
if err != nil {
log.Logger().Error("failed to search index", zap.Error(err))
return
}
callback(user.UserId, lo.Map(scores, func(v lo.Tuple2[int, float32], _ int) cache.Score {
return cache.Score{
Id: b.users[v.A].UserId,
Score: -float64(v.B),
Timestamp: b.timestamp,
}
}))
func (b *baseUserToUser[T]) PopAll(i int) []cache.Score {
scores, err := b.index.SearchIndex(i, b.n+1, true)
if err != nil {
log.Logger().Error("failed to search index", zap.Error(err))
return nil
}
return lo.Map(scores, func(v lo.Tuple2[int, float32], _ int) cache.Score {
return cache.Score{
Id: b.users[v.A].UserId,
Score: -float64(v.B),
Timestamp: b.timestamp,
}
})
}

type embeddingUserToUser struct {
Expand Down
32 changes: 5 additions & 27 deletions logics/user_to_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/stretchr/testify/suite"
"github.com/zhenghaoz/gorse/dataset"
"github.com/zhenghaoz/gorse/storage/cache"
"github.com/zhenghaoz/gorse/storage/data"
)

Expand All @@ -45,12 +44,7 @@ func (suite *UserToUserTestSuite) TestEmbedding() {
}, nil)
}

var scores []cache.Score
user2user.PopAll(func(userId string, score []cache.Score) {
if userId == "0" {
scores = score
}
})
scores := user2user.PopAll(0)
suite.Len(scores, 10)
for i := 1; i <= 10; i++ {
suite.Equal(strconv.Itoa(i), scores[i-1].Id)
Expand Down Expand Up @@ -79,12 +73,7 @@ func (suite *UserToUserTestSuite) TestTags() {
}, nil)
}

var scores []cache.Score
user2user.PopAll(func(userId string, score []cache.Score) {
if userId == "0" {
scores = score
}
})
scores := user2user.PopAll(0)
suite.Len(scores, 10)
for i := 1; i <= 10; i++ {
suite.Equal(strconv.Itoa(i), scores[i-1].Id)
Expand All @@ -108,12 +97,7 @@ func (suite *UserToUserTestSuite) TestItems() {
user2user.Push(&data.User{UserId: strconv.Itoa(i)}, feedback)
}

var scores []cache.Score
user2user.PopAll(func(userId string, score []cache.Score) {
if userId == "0" {
scores = score
}
})
scores := user2user.PopAll(0)
suite.Len(scores, 10)
for i := 1; i <= 10; i++ {
suite.Equal(strconv.Itoa(i), scores[i-1].Id)
Expand Down Expand Up @@ -146,15 +130,9 @@ func (suite *UserToUserTestSuite) TestAuto() {
user2user.Push(user, feedback)
}

var scores0, scores1 []cache.Score
user2user.PopAll(func(userId string, score []cache.Score) {
if userId == "0" {
scores0 = score
} else if userId == "1" {
scores1 = score
}
})
scores0 := user2user.PopAll(0)
suite.Len(scores0, 10)
scores1 := user2user.PopAll(1)
suite.Len(scores1, 10)
}

Expand Down
53 changes: 31 additions & 22 deletions master/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,31 +1033,35 @@ func (m *Master) updateItemToItem(dataset *dataset.Dataset) error {

// Save item-to-item recommendations to cache
for i, recommender := range itemToItemRecommenders {
recommender.PopAll(func(itemId string, score []cache.Score) {
for j, item := range recommender.Items() {
itemToItemConfig := itemToItemConfigs[i]
if m.needUpdateItemToItem(itemId, itemToItemConfigs[i]) {
if m.needUpdateItemToItem(item.ItemId, itemToItemConfig) {
score := recommender.PopAll(j)
if score == nil {
continue
}
log.Logger().Debug("update item-to-item recommendation",
zap.String("item_id", itemId),
zap.String("item_id", item.ItemId),
zap.String("name", itemToItemConfig.Name),
zap.Int("n_recommendations", len(score)))
// Save item-to-item recommendation to cache
if err := m.CacheClient.AddScores(ctx, cache.ItemToItem, cache.Key(itemToItemConfig.Name, itemId), score); err != nil {
if err := m.CacheClient.AddScores(ctx, cache.ItemToItem, cache.Key(itemToItemConfig.Name, item.ItemId), score); err != nil {
log.Logger().Error("failed to save item-to-item recommendation to cache",
zap.String("item_id", itemId), zap.Error(err))
return
zap.String("item_id", item.ItemId), zap.Error(err))
continue
}
// Save item-to-item digest and last update time to cache
if err := m.CacheClient.Set(ctx,
cache.String(cache.Key(cache.ItemToItemDigest, itemToItemConfig.Name, itemId), itemToItemConfig.Hash()),
cache.Time(cache.Key(cache.ItemToItemUpdateTime, itemToItemConfig.Name, itemId), time.Now()),
cache.String(cache.Key(cache.ItemToItemDigest, itemToItemConfig.Name, item.ItemId), itemToItemConfig.Hash()),
cache.Time(cache.Key(cache.ItemToItemUpdateTime, itemToItemConfig.Name, item.ItemId), time.Now()),
); err != nil {
log.Logger().Error("failed to save item-to-item digest to cache",
zap.String("item_id", itemId), zap.Error(err))
return
zap.String("item_id", item.ItemId), zap.Error(err))
continue
}
}
span.Add(1)
})
}
}
return nil
}
Expand Down Expand Up @@ -1132,26 +1136,31 @@ func (m *Master) updateUserToUser(dataset *dataset.Dataset) error {
}

// Save user-to-user recommendations to cache
userToUserRecommender.PopAll(func(userId string, score []cache.Score) {
if m.needUpdateUserToUser(userId) {
for j, user := range userToUserRecommender.Users() {
if m.needUpdateUserToUser(user.UserId) {
score := userToUserRecommender.PopAll(j)
if score == nil {
continue
}
log.Logger().Debug("update user neighbors",
zap.String("user_id", userId),
zap.String("user_id", user.UserId),
zap.Int("n_recommendations", len(score)))
// Save user-to-user recommendations to cache
if err := m.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), score); err != nil {
log.Logger().Error("failed to save user neighbors to cache", zap.String("user_id", userId), zap.Error(err))
return
if err := m.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, user.UserId), score); err != nil {
log.Logger().Error("failed to save user neighbors to cache", zap.String("user_id", user.UserId), zap.Error(err))
continue
}
// Save user-to-user digest and last update time to cache
if err := m.CacheClient.Set(ctx,
cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId)), m.Config.UserNeighborDigest()),
cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId)), time.Now()),
cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, user.UserId)), m.Config.UserNeighborDigest()),
cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, user.UserId)), time.Now()),
); err != nil {
log.Logger().Error("failed to save user neighbors digest to cache", zap.String("user_id", userId), zap.Error(err))
return
log.Logger().Error("failed to save user neighbors digest to cache", zap.String("user_id", user.UserId), zap.Error(err))
continue
}
}
})
span.Add(1)
}
return nil
}

Expand Down
Loading