diff --git a/client/client_test.go b/client/client_test.go index 43d841d0c..9511a8867 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -19,11 +19,13 @@ package client import ( "context" "encoding/base64" + "testing" + "time" + client "github.com/gorse-io/gorse-go" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/suite" - "testing" - "time" + "github.com/zhenghaoz/gorse/storage/cache" ) const ( @@ -108,22 +110,22 @@ func (suite *GorseClientTestSuite) TestRecommend() { func (suite *GorseClientTestSuite) TestSessionRecommend() { ctx := context.Background() - suite.hSet("item_neighbors", "1", []client.Score{ + suite.hSet("item-to-item", cache.Key(cache.Neighbors, "1"), []client.Score{ {Id: "2", Score: 100000}, {Id: "9", Score: 1}, }) - suite.hSet("item_neighbors", "2", []client.Score{ + suite.hSet("item-to-item", cache.Key(cache.Neighbors, "2"), []client.Score{ {Id: "3", Score: 100000}, {Id: "8", Score: 1}, {Id: "9", Score: 1}, }) - suite.hSet("item_neighbors", "3", []client.Score{ + suite.hSet("item-to-item", cache.Key(cache.Neighbors, "3"), []client.Score{ {Id: "4", Score: 100000}, {Id: "7", Score: 1}, {Id: "8", Score: 1}, {Id: "9", Score: 1}, }) - suite.hSet("item_neighbors", "4", []client.Score{ + suite.hSet("item-to-item", cache.Key(cache.Neighbors, "4"), []client.Score{ {Id: "1", Score: 100000}, {Id: "6", Score: 1}, {Id: "7", Score: 1}, @@ -179,7 +181,7 @@ func (suite *GorseClientTestSuite) TestSessionRecommend() { func (suite *GorseClientTestSuite) TestNeighbors() { ctx := context.Background() - suite.hSet("item_neighbors", "100", []client.Score{ + suite.hSet("item-to-item", cache.Key(cache.Neighbors, "100"), []client.Score{ {Id: "1", Score: 1}, {Id: "2", Score: 2}, {Id: "3", Score: 3}, diff --git a/common/ann/bruteforce.go b/common/ann/bruteforce.go index 1869da9ae..9a20fa4c1 100644 --- a/common/ann/bruteforce.go +++ b/common/ann/bruteforce.go @@ -22,15 +22,15 @@ import ( // Bruteforce is a naive implementation of vector index. type Bruteforce[T any] struct { - distanceFunc func(a, b []T) float32 - vectors [][]T + distanceFunc func(a, b T) float32 + vectors []T } -func NewBruteforce[T any](distanceFunc func(a, b []T) float32) *Bruteforce[T] { +func NewBruteforce[T any](distanceFunc func(a, b T) float32) *Bruteforce[T] { return &Bruteforce[T]{distanceFunc: distanceFunc} } -func (b *Bruteforce[T]) Add(v []T) (int, error) { +func (b *Bruteforce[T]) Add(v T) (int, error) { // Add vector b.vectors = append(b.vectors, v) return len(b.vectors), nil @@ -62,7 +62,7 @@ func (b *Bruteforce[T]) SearchIndex(q, k int, prune0 bool) ([]lo.Tuple2[int, flo return scores, nil } -func (b *Bruteforce[T]) SearchVector(q []T, k int, prune0 bool) []lo.Tuple2[int, float32] { +func (b *Bruteforce[T]) SearchVector(q T, k int, prune0 bool) []lo.Tuple2[int, float32] { // Search pq := heap.NewPriorityQueue(true) for i, vec := range b.vectors { diff --git a/common/ann/hnsw.go b/common/ann/hnsw.go index 481a17ac2..a2e9ffe6f 100644 --- a/common/ann/hnsw.go +++ b/common/ann/hnsw.go @@ -26,8 +26,8 @@ import ( // HNSW is a vector index based on Hierarchical Navigable Small Worlds. type HNSW[T any] struct { - distanceFunc func(a, b []T) float32 - vectors [][]T + distanceFunc func(a, b T) float32 + vectors []T bottomNeighbors []*heap.PriorityQueue upperNeighbors []map[int32]*heap.PriorityQueue enterPoint int32 @@ -40,7 +40,7 @@ type HNSW[T any] struct { efConstruction int } -func NewHNSW[T any](distanceFunc func(a, b []T) float32) *HNSW[T] { +func NewHNSW[T any](distanceFunc func(a, b T) float32) *HNSW[T] { return &HNSW[T]{ distanceFunc: distanceFunc, levelFactor: 1.0 / math32.Log(48), @@ -50,7 +50,7 @@ func NewHNSW[T any](distanceFunc func(a, b []T) float32) *HNSW[T] { } } -func (h *HNSW[T]) Add(v []T) (int, error) { +func (h *HNSW[T]) Add(v T) (int, error) { // Add vector h.vectors = append(h.vectors, v) h.bottomNeighbors = append(h.bottomNeighbors, heap.NewPriorityQueue(false)) @@ -70,7 +70,7 @@ func (h *HNSW[T]) SearchIndex(q, k int, prune0 bool) ([]lo.Tuple2[int, float32], return scores, nil } -func (h *HNSW[T]) SearchVector(q []T, k int, prune0 bool) []lo.Tuple2[int, float32] { +func (h *HNSW[T]) SearchVector(q T, k int, prune0 bool) []lo.Tuple2[int, float32] { w := h.knnSearch(q, k, h.efSearchValue(k)) scores := make([]lo.Tuple2[int, float32], 0) for w.Len() > 0 { @@ -82,7 +82,7 @@ func (h *HNSW[T]) SearchVector(q []T, k int, prune0 bool) []lo.Tuple2[int, float return scores } -func (h *HNSW[T]) knnSearch(q []T, k, ef int) *heap.PriorityQueue { +func (h *HNSW[T]) knnSearch(q T, k, ef int) *heap.PriorityQueue { var ( w *heap.PriorityQueue // set for the current the nearest element enterPoints = h.distance(q, []int32{h.enterPoint}) // get enter point for hnsw @@ -157,7 +157,7 @@ func (h *HNSW[T]) insert(q int32) { } } -func (h *HNSW[T]) searchLayer(q []T, enterPoints *heap.PriorityQueue, ef, currentLayer int) *heap.PriorityQueue { +func (h *HNSW[T]) searchLayer(q T, enterPoints *heap.PriorityQueue, ef, currentLayer int) *heap.PriorityQueue { var ( v = mapset.NewSet(enterPoints.Values()...) // set of visited elements candidates = enterPoints.Clone() // set of candidates @@ -210,7 +210,7 @@ func (h *HNSW[T]) getNeighbourhood(e int32, currentLayer int) *heap.PriorityQueu } } -func (h *HNSW[T]) selectNeighbors(_ []T, candidates *heap.PriorityQueue, m int) *heap.PriorityQueue { +func (h *HNSW[T]) selectNeighbors(_ T, candidates *heap.PriorityQueue, m int) *heap.PriorityQueue { pq := candidates.Reverse() for pq.Len() > m { pq.Pop() @@ -218,7 +218,7 @@ func (h *HNSW[T]) selectNeighbors(_ []T, candidates *heap.PriorityQueue, m int) return pq.Reverse() } -func (h *HNSW[T]) distance(q []T, points []int32) *heap.PriorityQueue { +func (h *HNSW[T]) distance(q T, points []int32) *heap.PriorityQueue { pq := heap.NewPriorityQueue(false) for _, point := range points { pq.Push(point, h.distanceFunc(h.vectors[point], q)) diff --git a/config/config.go b/config/config.go index 4f90ad006..dd58ee677 100644 --- a/config/config.go +++ b/config/config.go @@ -148,7 +148,7 @@ type NeighborsConfig struct { type ItemToItemConfig struct { Name string `mapstructure:"name" json:"name"` - Type string `mapstructure:"type" json:"type" validate:"oneof=embedding tags"` + Type string `mapstructure:"type" json:"type" validate:"oneof=embedding tags users"` Column string `mapstructure:"column" json:"column" validate:"item_expr"` } diff --git a/config/config.toml b/config/config.toml index d7deb9f9d..637865b7b 100644 --- a/config/config.toml +++ b/config/config.toml @@ -149,6 +149,20 @@ score = "count(feedback, .FeedbackType == 'star')" # The filter for items in the leaderboard. filter = "(now() - item.Timestamp).Hours() < 168" +# [[recommend.item-to-item]] + +# # The name of the item-to-item recommender. +# name = "similar_embedding" + +# # The type of the item-to-item recommender. There are three types: +# # embedding: recommend by Euclidean distance of embeddings. +# # tags: recommend by number of common tags. +# # users: recommend by number of common users. +# type = "embedding" + +# # The column of the item embeddings. Leave blank if type is "users". +# column = "item.Labels.embedding" + [recommend.user_neighbors] # The type of neighbors for users. There are three types: @@ -157,7 +171,7 @@ filter = "(now() - item.Timestamp).Hours() < 168" # auto: If a user have labels, neighbors are found by number of common labels. # If this user have no labels, neighbors are found by number of common liked items. # The default value is "auto". -neighbor_type = "similar" +neighbor_type = "related" [recommend.item_neighbors] diff --git a/config/config_test.go b/config/config_test.go index a4e355b05..18ded95fd 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -109,7 +109,7 @@ func TestUnmarshal(t *testing.T) { assert.Equal(t, "count(feedback, .FeedbackType == 'star')", config.Recommend.NonPersonalized[0].Score) assert.Equal(t, "(now() - item.Timestamp).Hours() < 168", config.Recommend.NonPersonalized[0].Filter) // [recommend.user_neighbors] - assert.Equal(t, "similar", config.Recommend.UserNeighbors.NeighborType) + assert.Equal(t, "related", config.Recommend.UserNeighbors.NeighborType) // [recommend.item_neighbors] assert.Equal(t, "similar", config.Recommend.ItemNeighbors.NeighborType) // [recommend.collaborative] diff --git a/dataset/dataset.go b/dataset/dataset.go index b948a3657..aba119c66 100644 --- a/dataset/dataset.go +++ b/dataset/dataset.go @@ -15,28 +15,39 @@ package dataset import ( + "time" + "github.com/chewxy/math32" "github.com/samber/lo" "github.com/zhenghaoz/gorse/storage/data" "modernc.org/strutil" - "time" ) type ID int type Dataset struct { timestamp time.Time + users []data.User items []data.Item - columnNames *strutil.Pool - columnValues *FreqDict + userLabels *Labels + itemLabels *Labels + userFeedback [][]ID + itemFeedback [][]ID + userDict *FreqDict + itemDict *FreqDict } -func NewDataset(timestamp time.Time, itemCount int) *Dataset { +func NewDataset(timestamp time.Time, userCount, itemCount int) *Dataset { return &Dataset{ timestamp: timestamp, + users: make([]data.User, 0, userCount), items: make([]data.Item, 0, itemCount), - columnNames: strutil.NewPool(), - columnValues: NewFreqDict(), + userLabels: NewLabels(), + itemLabels: NewLabels(), + userFeedback: make([][]ID, userCount), + itemFeedback: make([][]ID, itemCount), + userDict: NewFreqDict(), + itemDict: NewFreqDict(), } } @@ -44,36 +55,123 @@ func (d *Dataset) GetTimestamp() time.Time { return d.timestamp } +func (d *Dataset) GetUsers() []data.User { + return d.users +} + func (d *Dataset) GetItems() []data.Item { return d.items } +func (d *Dataset) GetUserFeedback() [][]ID { + return d.userFeedback +} + +func (d *Dataset) GetItemFeedback() [][]ID { + return d.itemFeedback +} + +// GetUserIDF returns the IDF of users. +// +// IDF(u) = log(I/freq(u)) +// +// I is the number of items. +// freq(u) is the frequency of user u in all feedback. +func (d *Dataset) GetUserIDF() []float32 { + idf := make([]float32, d.userDict.Count()) + for i := 0; i < d.userDict.Count(); i++ { + // Since zero IDF will cause NaN in the future, we set the minimum value to 1e-3. + idf[i] = max(math32.Log(float32(len(d.items))/float32(d.userDict.Freq(i))), 1e-3) + } + return idf +} + +// GetItemIDF returns the IDF of items. +// +// IDF(i) = log(U/freq(i)) +// +// U is the number of users. +// freq(i) is the frequency of item i in all feedback. +func (d *Dataset) GetItemIDF() []float32 { + idf := make([]float32, d.itemDict.Count()) + for i := 0; i < d.itemDict.Count(); i++ { + // Since zero IDF will cause NaN in the future, we set the minimum value to 1e-3. + idf[i] = max(math32.Log(float32(len(d.users))/float32(d.itemDict.Freq(i))), 1e-3) + } + return idf +} + +func (d *Dataset) GetUserColumnValuesIDF() []float32 { + idf := make([]float32, d.userLabels.values.Count()) + for i := 0; i < d.userLabels.values.Count(); i++ { + // Since zero IDF will cause NaN in the future, we set the minimum value to 1e-3. + idf[i] = max(math32.Log(float32(len(d.users))/float32(d.userLabels.values.Freq(i))), 1e-3) + } + return idf +} + func (d *Dataset) GetItemColumnValuesIDF() []float32 { - idf := make([]float32, d.columnValues.Count()) - for i := 0; i < d.columnValues.Count(); i++ { + idf := make([]float32, d.itemLabels.values.Count()) + for i := 0; i < d.itemLabels.values.Count(); i++ { // Since zero IDF will cause NaN in the future, we set the minimum value to 1e-3. - idf[i] = max(math32.Log(float32(len(d.items)/(d.columnValues.Freq(i)))), 1e-3) + idf[i] = max(math32.Log(float32(len(d.items))/float32(d.itemLabels.values.Freq(i))), 1e-3) } return idf } +func (d *Dataset) AddUser(user data.User) { + d.users = append(d.users, data.User{ + UserId: user.UserId, + Labels: d.userLabels.processLabels(user.Labels, ""), + Subscribe: user.Subscribe, + Comment: user.Comment, + }) + d.userDict.NotCount(user.UserId) + if len(d.userFeedback) < len(d.users) { + d.userFeedback = append(d.userFeedback, nil) + } +} + func (d *Dataset) AddItem(item data.Item) { d.items = append(d.items, data.Item{ ItemId: item.ItemId, IsHidden: item.IsHidden, Categories: item.Categories, Timestamp: item.Timestamp, - Labels: d.processLabels(item.Labels, ""), + Labels: d.itemLabels.processLabels(item.Labels, ""), Comment: item.Comment, }) + d.itemDict.NotCount(item.ItemId) + if len(d.itemFeedback) < len(d.items) { + d.itemFeedback = append(d.itemFeedback, nil) + } +} + +func (d *Dataset) AddFeedback(userId, itemId string) { + userIndex := d.userDict.Id(userId) + itemIndex := d.itemDict.Id(itemId) + d.userFeedback[userIndex] = append(d.userFeedback[userIndex], ID(itemIndex)) + d.itemFeedback[itemIndex] = append(d.itemFeedback[itemIndex], ID(userIndex)) +} + +type Labels struct { + fields *strutil.Pool + values *FreqDict +} + +func NewLabels() *Labels { + return &Labels{ + fields: strutil.NewPool(), + values: NewFreqDict(), + } } -func (d *Dataset) processLabels(labels any, parent string) any { +func (l *Labels) processLabels(labels any, parent string) any { switch typed := labels.(type) { case map[string]any: o := make(map[string]any) for k, v := range typed { - o[d.columnNames.Align(k)] = d.processLabels(v, parent+"."+k) + o[l.fields.Align(k)] = l.processLabels(v, parent+"."+k) } return o case []any: @@ -83,12 +181,12 @@ func (d *Dataset) processLabels(labels any, parent string) any { }) } else if isSliceOf[string](typed) { return lo.Map(typed, func(e any, _ int) ID { - return ID(d.columnValues.Id(parent + ":" + e.(string))) + return ID(l.values.Id(parent + ":" + e.(string))) }) } return typed case string: - return ID(d.columnValues.Id(parent + ":" + typed)) + return ID(l.values.Id(parent + ":" + typed)) default: return labels } diff --git a/dataset/dataset_test.go b/dataset/dataset_test.go index eced19788..145985200 100644 --- a/dataset/dataset_test.go +++ b/dataset/dataset_test.go @@ -15,15 +15,17 @@ package dataset import ( + "strconv" + "testing" + "time" + "github.com/chewxy/math32" "github.com/stretchr/testify/assert" "github.com/zhenghaoz/gorse/storage/data" - "testing" - "time" ) func TestDataset_AddItem(t *testing.T) { - dataSet := NewDataset(time.Now(), 1) + dataSet := NewDataset(time.Now(), 0, 1) dataSet.AddItem(data.Item{ ItemId: "1", IsHidden: false, @@ -58,7 +60,7 @@ func TestDataset_AddItem(t *testing.T) { Labels: map[string]any{ "a": 1, "embedded": []float32{1.1, 2.2, 3.3}, - "tags": []ID{1, 2, 3}, + "tags": []ID{0, 1, 2}, }, Comment: "comment", }, dataSet.GetItems()[0]) @@ -70,15 +72,15 @@ func TestDataset_AddItem(t *testing.T) { Labels: map[string]any{ "a": 1, "embedded": []float32{1.1, 2.2, 3.3}, - "tags": []ID{2, 3, 1}, - "topics": []ID{4, 5, 6}, + "tags": []ID{1, 2, 0}, + "topics": []ID{3, 4, 5}, }, Comment: "comment", }, dataSet.GetItems()[1]) } func TestDataset_GetItemColumnValuesIDF(t *testing.T) { - dataSet := NewDataset(time.Now(), 1) + dataSet := NewDataset(time.Now(), 0, 1) dataSet.AddItem(data.Item{ ItemId: "1", IsHidden: false, @@ -100,7 +102,71 @@ func TestDataset_GetItemColumnValuesIDF(t *testing.T) { Comment: "comment", }) idf := dataSet.GetItemColumnValuesIDF() - assert.Len(t, idf, 5) - assert.InDelta(t, 1e-3, idf[1], 1e-6) - assert.InDelta(t, math32.Log(2), idf[2], 1e-6) + assert.Len(t, idf, 4) + assert.InDelta(t, 1e-3, idf[0], 1e-6) + assert.InDelta(t, math32.Log(2), idf[1], 1e-6) +} + +func TestDataset_AddUser(t *testing.T) { + dataSet := NewDataset(time.Now(), 1, 0) + dataSet.AddUser(data.User{ + UserId: "1", + Labels: map[string]any{"a": 1, "b": "a"}, + Comment: "comment", + }) + assert.Len(t, dataSet.users, 1) + assert.Equal(t, data.User{ + UserId: "1", + Labels: map[string]any{"a": 1, "b": ID(0)}, + Comment: "comment", + }, dataSet.users[0]) +} + +func TestDataset_GetUserColumnValuesIDF(t *testing.T) { + dataSet := NewDataset(time.Now(), 1, 0) + dataSet.AddUser(data.User{ + UserId: "1", + Labels: map[string]any{ + "tags": []any{"a", "b", "c"}, + }, + Comment: "comment", + }) + dataSet.AddUser(data.User{ + UserId: "2", + Labels: map[string]any{ + "tags": []any{"a", "e"}, + }, + Comment: "comment", + }) + idf := dataSet.GetUserColumnValuesIDF() + assert.Len(t, idf, 4) + assert.InDelta(t, 1e-3, idf[0], 1e-6) + assert.InDelta(t, math32.Log(2), idf[1], 1e-6) +} + +func TestDataset_AddFeedback(t *testing.T) { + dataSet := NewDataset(time.Now(), 10, 10) + for i := 0; i < 10; i++ { + dataSet.AddUser(data.User{ + UserId: strconv.Itoa(i), + }) + } + for i := 0; i < 10; i++ { + dataSet.AddItem(data.Item{ + ItemId: strconv.Itoa(i), + }) + } + for i := 0; i < 10; i++ { + for j := i; j < 10; j++ { + dataSet.AddFeedback(strconv.Itoa(i), strconv.Itoa(j)) + } + } + userIDF := dataSet.GetUserIDF() + itemIDF := dataSet.GetItemIDF() + for i := 0; i < 10; i++ { + assert.Len(t, dataSet.GetUserFeedback()[i], 10-i) + assert.Len(t, dataSet.GetItemFeedback()[i], i+1) + assert.InDelta(t, math32.Log(float32(10)/float32(10-i)), userIDF[i], 1e-2) + assert.InDelta(t, math32.Log(float32(10)/float32(i+1)), itemIDF[i], 1e-2) + } } diff --git a/dataset/dict.go b/dataset/dict.go index 18c72c86d..3d015be99 100644 --- a/dataset/dict.go +++ b/dataset/dict.go @@ -22,7 +22,6 @@ type FreqDict struct { func NewFreqDict() (d *FreqDict) { d = &FreqDict{map[string]int{}, []string{}, []int{}} - d.Id("") return } @@ -43,6 +42,18 @@ func (d *FreqDict) Id(s string) (y int) { return } +func (d *FreqDict) NotCount(s string) (y int) { + if y, ok := d.si[s]; ok { + return y + } + + y = len(d.is) + d.si[s] = y + d.is = append(d.is, s) + d.cnt = append(d.cnt, 0) + return +} + func (d *FreqDict) String(id int) (s string, ok bool) { if id >= len(d.is) { return "", false diff --git a/dataset/dict_test.go b/dataset/dict_test.go index 10af2c999..25e8f637d 100644 --- a/dataset/dict_test.go +++ b/dataset/dict_test.go @@ -15,21 +15,21 @@ package dataset import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestFreqDict(t *testing.T) { dict := NewFreqDict() - assert.Equal(t, 0, dict.Id("")) - assert.Equal(t, 1, dict.Id("a")) - assert.Equal(t, 2, dict.Id("b")) - assert.Equal(t, 2, dict.Id("b")) - assert.Equal(t, 3, dict.Id("c")) - assert.Equal(t, 3, dict.Id("c")) - assert.Equal(t, 3, dict.Id("c")) - assert.Equal(t, 4, dict.Count()) - assert.Equal(t, 1, dict.Freq(1)) - assert.Equal(t, 2, dict.Freq(2)) - assert.Equal(t, 3, dict.Freq(3)) + assert.Equal(t, 0, dict.Id("a")) + assert.Equal(t, 1, dict.Id("b")) + assert.Equal(t, 1, dict.Id("b")) + assert.Equal(t, 2, dict.Id("c")) + assert.Equal(t, 2, dict.Id("c")) + assert.Equal(t, 2, dict.Id("c")) + assert.Equal(t, 3, dict.Count()) + assert.Equal(t, 1, dict.Freq(0)) + assert.Equal(t, 2, dict.Freq(1)) + assert.Equal(t, 3, dict.Freq(2)) } diff --git a/go.mod b/go.mod index 5d67721c4..ad7191e11 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 github.com/google/uuid v1.6.0 github.com/gorilla/securecookie v1.1.1 - github.com/gorse-io/dashboard v0.0.0-20250101053324-8d40fd3b3a1c + github.com/gorse-io/dashboard v0.0.0-20250125070654-fba9eb31ccea github.com/gorse-io/gorse-go v0.5.0-alpha.1 github.com/haxii/go-swagger-ui v0.0.0-20210203093335-a63a6bbde946 github.com/jaswdr/faker v1.16.0 diff --git a/go.sum b/go.sum index e4f46e82e..46a8f1ac2 100644 --- a/go.sum +++ b/go.sum @@ -303,8 +303,8 @@ github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyC github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorse-io/clickhouse v0.3.3-0.20220715124633-688011a495bb h1:z/oOWE+Vy0PLcwIulZmIug4FtmvE3dJ1YOGprLeHwwY= github.com/gorse-io/clickhouse v0.3.3-0.20220715124633-688011a495bb/go.mod h1:iILWzbul8U+gsf4kqbheF2QzBmdvVp63mloGGK8emDI= -github.com/gorse-io/dashboard v0.0.0-20250101053324-8d40fd3b3a1c h1:2G3W2QefCQqnAz6UiKFd0SQM5aG8KPPzJT1eakODFMY= -github.com/gorse-io/dashboard v0.0.0-20250101053324-8d40fd3b3a1c/go.mod h1:6h/3EYChEyiynyCMMDsCsDEVBSOPLSo1L/+aHqj9kdc= +github.com/gorse-io/dashboard v0.0.0-20250125070654-fba9eb31ccea h1:ZJkKE4ZSfJOGbbmqsRwgwPYwOPUMNL3lixBpfqRffpc= +github.com/gorse-io/dashboard v0.0.0-20250125070654-fba9eb31ccea/go.mod h1:6h/3EYChEyiynyCMMDsCsDEVBSOPLSo1L/+aHqj9kdc= github.com/gorse-io/gorgonia v0.0.0-20230817132253-6dd1dbf95849 h1:Hwywr6NxzYeZYn35KwOsw7j8ZiMT60TBzpbn1MbEido= github.com/gorse-io/gorgonia v0.0.0-20230817132253-6dd1dbf95849/go.mod h1:TtVGAt7ENNmgBnC0JA68CAjIDCEtcqaRHvnkAWJ/Fu0= github.com/gorse-io/gorse-go v0.5.0-alpha.1 h1:QBWKGAbSKNAWnieXVIdQiE0lLGvKXfFFAFPOQEkPW/E= diff --git a/logics/item_to_item.go b/logics/item_to_item.go index 3bc2fe604..737b4509b 100644 --- a/logics/item_to_item.go +++ b/logics/item_to_item.go @@ -16,6 +16,9 @@ package logics import ( "errors" + "sort" + "time" + "github.com/chewxy/math32" mapset "github.com/deckarep/golang-set/v2" "github.com/expr-lang/expr" @@ -29,22 +32,38 @@ import ( "github.com/zhenghaoz/gorse/storage/cache" "github.com/zhenghaoz/gorse/storage/data" "go.uber.org/zap" - "sort" - "time" ) +type ItemToItemOptions struct { + TagsIDF []float32 + UsersIDF []float32 +} + type ItemToItem interface { - Items() []string - Push(item data.Item) + Items() []*data.Item + Push(item *data.Item, feedback []dataset.ID) PopAll(callback func(itemId string, score []cache.Score)) } -func NewItemToItem(cfg config.ItemToItemConfig, n int, timestamp time.Time, idf []float32) (ItemToItem, error) { +func NewItemToItem(cfg config.ItemToItemConfig, n int, timestamp time.Time, opts *ItemToItemOptions) (ItemToItem, error) { switch cfg.Type { case "embedding": return newEmbeddingItemToItem(cfg, n, timestamp) case "tags": - return newTagsItemToItem(cfg, n, timestamp, idf) + if opts == nil || opts.TagsIDF == nil { + return nil, errors.New("tags IDF is required for tags item-to-item") + } + return newTagsItemToItem(cfg, n, timestamp, opts.TagsIDF) + case "users": + if opts == nil || opts.UsersIDF == nil { + return nil, errors.New("users IDF is required for users item-to-item") + } + return newUsersItemToItem(cfg, n, timestamp, opts.UsersIDF) + case "auto": + if opts == nil || opts.TagsIDF == nil || opts.UsersIDF == nil { + return nil, errors.New("tags and users IDF are required for auto item-to-item") + } + return newAutoItemToItem(cfg, n, timestamp, opts.TagsIDF, opts.UsersIDF) default: return nil, errors.New("invalid item-to-item type") } @@ -56,10 +75,10 @@ type baseItemToItem[T any] struct { timestamp time.Time columnFunc *vm.Program index *ann.HNSW[T] - items []string + items []*data.Item } -func (b *baseItemToItem[T]) Items() []string { +func (b *baseItemToItem[T]) Items() []*data.Item { return b.items } @@ -70,18 +89,19 @@ func (b *baseItemToItem[T]) PopAll(callback func(itemId string, score []cache.Sc log.Logger().Error("failed to search index", zap.Error(err)) return } - callback(item, lo.Map(scores, func(v lo.Tuple2[int, float32], _ int) cache.Score { + callback(item.ItemId, lo.Map(scores, func(v lo.Tuple2[int, float32], _ int) cache.Score { return cache.Score{ - Id: b.items[v.A], - Score: float64(v.B), - Timestamp: b.timestamp, + Id: b.items[v.A].ItemId, + Categories: b.items[v.A].Categories, + Score: -float64(v.B), + Timestamp: b.timestamp, } })) } } type embeddingItemToItem struct { - baseItemToItem[float32] + baseItemToItem[[]float32] dimension int } @@ -93,16 +113,16 @@ func newEmbeddingItemToItem(cfg config.ItemToItemConfig, n int, timestamp time.T if err != nil { return nil, err } - return &embeddingItemToItem{baseItemToItem: baseItemToItem[float32]{ + return &embeddingItemToItem{baseItemToItem: baseItemToItem[[]float32]{ name: cfg.Name, n: n, timestamp: timestamp, columnFunc: columnFunc, - index: ann.NewHNSW[float32](floats.Euclidean), + index: ann.NewHNSW[[]float32](floats.Euclidean), }}, nil } -func (e *embeddingItemToItem) Push(item data.Item) { +func (e *embeddingItemToItem) Push(item *data.Item, _ []dataset.ID) { // Check if hidden if item.IsHidden { return @@ -130,7 +150,7 @@ func (e *embeddingItemToItem) Push(item data.Item) { return } // Push item - e.items = append(e.items, item.ItemId) + e.items = append(e.items, item) _, err = e.index.Add(v) if err != nil { log.Logger().Error("failed to add item to index", zap.Error(err)) @@ -139,8 +159,8 @@ func (e *embeddingItemToItem) Push(item data.Item) { } type tagsItemToItem struct { - baseItemToItem[dataset.ID] - idf []float32 + baseItemToItem[[]dataset.ID] + IDF } func newTagsItemToItem(cfg config.ItemToItemConfig, n int, timestamp time.Time, idf []float32) (ItemToItem, error) { @@ -151,20 +171,18 @@ func newTagsItemToItem(cfg config.ItemToItemConfig, n int, timestamp time.Time, if err != nil { return nil, err } - t := &tagsItemToItem{} - b := baseItemToItem[dataset.ID]{ + t := &tagsItemToItem{IDF: idf} + t.baseItemToItem = baseItemToItem[[]dataset.ID]{ name: cfg.Name, n: n, timestamp: timestamp, columnFunc: columnFunc, - index: ann.NewHNSW[dataset.ID](t.distance), + index: ann.NewHNSW[[]dataset.ID](t.distance), } - t.baseItemToItem = b - t.idf = idf return t, nil } -func (t *tagsItemToItem) Push(item data.Item) { +func (t *tagsItemToItem) Push(item *data.Item, _ []dataset.ID) { // Check if hidden if item.IsHidden { return @@ -180,13 +198,13 @@ func (t *tagsItemToItem) Push(item data.Item) { } // Extract tags tSet := mapset.NewSet[dataset.ID]() - t.flatten(result, tSet) + flatten(result, tSet) v := tSet.ToSlice() sort.Slice(v, func(i, j int) bool { return v[i] < v[j] }) // Push item - t.items = append(t.items, item.ItemId) + t.items = append(t.items, item) _, err = t.index.Add(v) if err != nil { log.Logger().Error("failed to add item to index", zap.Error(err)) @@ -194,16 +212,104 @@ func (t *tagsItemToItem) Push(item data.Item) { } } -func (t *tagsItemToItem) distance(a, b []dataset.ID) float32 { - commonSum, commonCount := t.weightedSumCommonElements(a, b) +type usersItemToItem struct { + baseItemToItem[[]dataset.ID] + IDF +} + +func newUsersItemToItem(cfg config.ItemToItemConfig, n int, timestamp time.Time, idf []float32) (ItemToItem, error) { + if cfg.Column != "" { + return nil, errors.New("column is not supported in users item-to-item") + } + u := &usersItemToItem{IDF: idf} + u.baseItemToItem = baseItemToItem[[]dataset.ID]{ + name: cfg.Name, + n: n, + timestamp: timestamp, + index: ann.NewHNSW[[]dataset.ID](u.distance), + } + return u, nil +} + +func (u *usersItemToItem) Push(item *data.Item, feedback []dataset.ID) { + // Check if hidden + if item.IsHidden { + return + } + // Sort feedback + sort.Slice(feedback, func(i, j int) bool { + return feedback[i] < feedback[j] + }) + // Push item + u.items = append(u.items, item) + _, err := u.index.Add(feedback) + if err != nil { + log.Logger().Error("failed to add item to index", zap.Error(err)) + return + } +} + +type autoItemToItem struct { + baseItemToItem[lo.Tuple2[[]dataset.ID, []dataset.ID]] + tIDF IDF + uIDF IDF +} + +func newAutoItemToItem(cfg config.ItemToItemConfig, n int, timestamp time.Time, tIDF, uIDF []float32) (ItemToItem, error) { + a := &autoItemToItem{ + tIDF: tIDF, + uIDF: uIDF, + } + a.baseItemToItem = baseItemToItem[lo.Tuple2[[]dataset.ID, []dataset.ID]]{ + name: cfg.Name, + n: n, + timestamp: timestamp, + index: ann.NewHNSW[lo.Tuple2[[]dataset.ID, []dataset.ID]](a.distance), + } + return a, nil +} + +func (a *autoItemToItem) Push(item *data.Item, feedback []dataset.ID) { + // Check if hidden + if item.IsHidden { + return + } + // Extract tags + tSet := mapset.NewSet[dataset.ID]() + flatten(item.Labels, tSet) + v := tSet.ToSlice() + sort.Slice(v, func(i, j int) bool { + return v[i] < v[j] + }) + // Sort feedback + sort.Slice(feedback, func(i, j int) bool { + return feedback[i] < feedback[j] + }) + // Push item + a.items = append(a.items, item) + _, err := a.index.Add(lo.Tuple2[[]dataset.ID, []dataset.ID]{A: v, B: feedback}) + if err != nil { + log.Logger().Error("failed to add item to index", zap.Error(err)) + return + } +} + +func (a *autoItemToItem) distance(u, v lo.Tuple2[[]dataset.ID, []dataset.ID]) float32 { + return (a.tIDF.distance(u.A, v.A) + a.uIDF.distance(u.B, v.B)) / 2 +} + +type IDF []float32 + +func (idf IDF) distance(a, b []dataset.ID) float32 { + commonSum, commonCount := idf.weightedSumCommonElements(a, b) if len(a) == len(b) && commonCount == float32(len(a)) { // If two items have the same tags, its distance is zero. return 0 - } else if commonCount > 0 { + } else if commonCount > 0 && len(a) > 0 && len(b) > 0 { // Add shrinkage to avoid division by zero return 1 - commonSum*commonCount/ - math32.Sqrt(t.weightedSum(a))/ - math32.Sqrt(t.weightedSum(b))/ + math32.Sqrt(idf.weightedSum(a))/ + math32.Sqrt(idf.weightedSum(b))/ (commonCount+100) } else { // If two items have no common tags, its distance is one. @@ -211,13 +317,11 @@ func (t *tagsItemToItem) distance(a, b []dataset.ID) float32 { } } -func (t *tagsItemToItem) weightedSumCommonElements(a, b []dataset.ID) (float32, float32) { +func (idf IDF) weightedSumCommonElements(a, b []dataset.ID) (float32, float32) { i, j, sum, count := 0, 0, float32(0), float32(0) for i < len(a) && j < len(b) { if a[i] == b[j] { - if a[i] >= 0 && int(a[i]) < len(t.idf) { - sum += t.idf[a[i]] - } + sum += idf[a[i]] count++ i++ j++ @@ -230,17 +334,15 @@ func (t *tagsItemToItem) weightedSumCommonElements(a, b []dataset.ID) (float32, return sum, count } -func (t *tagsItemToItem) weightedSum(a []dataset.ID) float32 { +func (idf IDF) weightedSum(a []dataset.ID) float32 { var sum float32 for _, i := range a { - if i >= 0 && int(i) < len(t.idf) { - sum += t.idf[i] - } + sum += idf[i] } return sum } -func (t *tagsItemToItem) flatten(o any, tSet mapset.Set[dataset.ID]) { +func flatten(o any, tSet mapset.Set[dataset.ID]) { switch typed := o.(type) { case dataset.ID: tSet.Add(typed) @@ -250,7 +352,7 @@ func (t *tagsItemToItem) flatten(o any, tSet mapset.Set[dataset.ID]) { return case map[string]any: for _, v := range typed { - t.flatten(v, tSet) + flatten(v, tSet) } } } diff --git a/logics/item_to_item_test.go b/logics/item_to_item_test.go index 3d59c0ece..70a6864ce 100644 --- a/logics/item_to_item_test.go +++ b/logics/item_to_item_test.go @@ -15,81 +15,86 @@ package logics import ( - "github.com/stretchr/testify/assert" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/suite" "github.com/zhenghaoz/gorse/config" "github.com/zhenghaoz/gorse/dataset" "github.com/zhenghaoz/gorse/storage/cache" "github.com/zhenghaoz/gorse/storage/data" - "strconv" - "testing" - "time" ) -func TestColumnFunc(t *testing.T) { +type ItemToItemTestSuite struct { + suite.Suite +} + +func (suite *ItemToItemTestSuite) TestColumnFunc() { item2item, err := newEmbeddingItemToItem(config.ItemToItemConfig{ Column: "item.Labels.description", }, 10, time.Now()) - assert.NoError(t, err) + suite.NoError(err) // Push success - item2item.Push(data.Item{ + item2item.Push(&data.Item{ ItemId: "1", Labels: map[string]any{ "description": []float32{0.1, 0.2, 0.3}, }, - }) - assert.Len(t, item2item.Items(), 1) + }, nil) + suite.Len(item2item.Items(), 1) // Hidden - item2item.Push(data.Item{ + item2item.Push(&data.Item{ ItemId: "2", IsHidden: true, Labels: map[string]any{ "description": []float32{0.1, 0.2, 0.3}, }, - }) - assert.Len(t, item2item.Items(), 1) + }, nil) + suite.Len(item2item.Items(), 1) // Dimension does not match - item2item.Push(data.Item{ + item2item.Push(&data.Item{ ItemId: "1", Labels: map[string]any{ "description": []float32{0.1, 0.2}, }, - }) - assert.Len(t, item2item.Items(), 1) + }, nil) + suite.Len(item2item.Items(), 1) // Type does not match - item2item.Push(data.Item{ + item2item.Push(&data.Item{ ItemId: "1", Labels: map[string]any{ "description": "hello", }, - }) - assert.Len(t, item2item.Items(), 1) + }, nil) + suite.Len(item2item.Items(), 1) // Column does not exist - item2item.Push(data.Item{ + item2item.Push(&data.Item{ ItemId: "2", Labels: []float32{0.1, 0.2, 0.3}, - }) - assert.Len(t, item2item.Items(), 1) + }, nil) + suite.Len(item2item.Items(), 1) } -func TestEmbedding(t *testing.T) { +func (suite *ItemToItemTestSuite) TestEmbedding() { timestamp := time.Now() item2item, err := newEmbeddingItemToItem(config.ItemToItemConfig{ Column: "item.Labels.description", }, 10, timestamp) - assert.NoError(t, err) + suite.NoError(err) for i := 0; i < 100; i++ { - item2item.Push(data.Item{ + item2item.Push(&data.Item{ ItemId: strconv.Itoa(i), Labels: map[string]any{ "description": []float32{0.1 * float32(i), 0.2 * float32(i), 0.3 * float32(i)}, }, - }) + }, nil) } var scores []cache.Score @@ -98,13 +103,13 @@ func TestEmbedding(t *testing.T) { scores = score } }) - assert.Len(t, scores, 10) + suite.Len(scores, 10) for i := 1; i <= 10; i++ { - assert.Equal(t, strconv.Itoa(i), scores[i-1].Id) + suite.Equal(strconv.Itoa(i), scores[i-1].Id) } } -func TestTags(t *testing.T) { +func (suite *ItemToItemTestSuite) TestTags() { timestamp := time.Now() idf := make([]float32, 101) for i := range idf { @@ -113,17 +118,17 @@ func TestTags(t *testing.T) { item2item, err := newTagsItemToItem(config.ItemToItemConfig{ Column: "item.Labels", }, 10, timestamp, idf) - assert.NoError(t, err) + suite.NoError(err) for i := 0; i < 100; i++ { labels := make(map[string]any) for j := 1; j <= 100-i; j++ { labels[strconv.Itoa(j)] = []dataset.ID{dataset.ID(j)} } - item2item.Push(data.Item{ + item2item.Push(&data.Item{ ItemId: strconv.Itoa(i), Labels: labels, - }) + }, nil) } var scores []cache.Score @@ -132,8 +137,85 @@ func TestTags(t *testing.T) { scores = score } }) - assert.Len(t, scores, 10) + suite.Len(scores, 10) for i := 1; i <= 10; i++ { - assert.Equal(t, strconv.Itoa(i), scores[i-1].Id) + suite.Equal(strconv.Itoa(i), scores[i-1].Id) } } + +func (suite *ItemToItemTestSuite) TestUsers() { + timestamp := time.Now() + idf := make([]float32, 101) + for i := range idf { + idf[i] = 1 + } + item2item, err := newUsersItemToItem(config.ItemToItemConfig{}, 10, timestamp, idf) + suite.NoError(err) + + for i := 0; i < 100; i++ { + feedback := make([]dataset.ID, 0, 100-i) + for j := 1; j <= 100-i; j++ { + feedback = append(feedback, dataset.ID(j)) + } + item2item.Push(&data.Item{ItemId: strconv.Itoa(i)}, feedback) + } + + var scores []cache.Score + item2item.PopAll(func(itemId string, score []cache.Score) { + if itemId == "0" { + scores = score + } + }) + suite.Len(scores, 10) + for i := 1; i <= 10; i++ { + suite.Equal(strconv.Itoa(i), scores[i-1].Id) + } +} + +func (suite *ItemToItemTestSuite) TestAuto() { + timestamp := time.Now() + idf := make([]float32, 101) + for i := range idf { + idf[i] = 1 + } + item2item, err := newAutoItemToItem(config.ItemToItemConfig{}, 10, timestamp, idf, idf) + suite.NoError(err) + + for i := 0; i < 100; i++ { + item := &data.Item{ItemId: strconv.Itoa(i)} + feedback := make([]dataset.ID, 0, 100-i) + if i%2 == 0 { + labels := make(map[string]any) + for j := 1; j <= 100-i; j++ { + labels[strconv.Itoa(j)] = []dataset.ID{dataset.ID(j)} + } + item.Labels = labels + } else { + for j := 1; j <= 100-i; j++ { + feedback = append(feedback, dataset.ID(j)) + } + } + item2item.Push(item, feedback) + } + + var scores0, scores1 []cache.Score + item2item.PopAll(func(itemId string, score []cache.Score) { + if itemId == "0" { + scores0 = score + } else if itemId == "1" { + scores1 = score + } + }) + suite.Len(scores0, 10) + for i := 1; i <= 10; i++ { + suite.Equal(strconv.Itoa(i*2), scores0[i-1].Id) + } + suite.Len(scores1, 10) + for i := 1; i <= 10; i++ { + suite.Equal(strconv.Itoa(i*2+1), scores1[i-1].Id) + } +} + +func TestItemToItem(t *testing.T) { + suite.Run(t, new(ItemToItemTestSuite)) +} diff --git a/logics/user_to_user.go b/logics/user_to_user.go new file mode 100644 index 000000000..9fb9c2a8d --- /dev/null +++ b/logics/user_to_user.go @@ -0,0 +1,277 @@ +// Copyright 2025 gorse Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logics + +import ( + "sort" + "time" + + mapset "github.com/deckarep/golang-set/v2" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" + "github.com/juju/errors" + "github.com/samber/lo" + "github.com/zhenghaoz/gorse/base/floats" + "github.com/zhenghaoz/gorse/base/log" + "github.com/zhenghaoz/gorse/common/ann" + "github.com/zhenghaoz/gorse/config" + "github.com/zhenghaoz/gorse/dataset" + "github.com/zhenghaoz/gorse/storage/cache" + "github.com/zhenghaoz/gorse/storage/data" + "go.uber.org/zap" +) + +type UserToUserConfig config.ItemToItemConfig + +type UserToUserOptions struct { + TagsIDF []float32 + ItemsIDF []float32 +} + +type UserToUser interface { + Users() []*data.User + Push(user *data.User, feedback []dataset.ID) + PopAll(callback func(userId string, score []cache.Score)) +} + +func NewUserToUser(cfg UserToUserConfig, n int, timestamp time.Time, opts *UserToUserOptions) (UserToUser, error) { + switch cfg.Type { + case "embedding": + return newEmbeddingUserToUser(cfg, n, timestamp) + case "tags": + if opts == nil || opts.TagsIDF == nil { + return nil, errors.New("tags IDF is required for tags user-to-user") + } + return newTagsUserToUser(cfg, n, timestamp, opts.TagsIDF) + case "items": + if opts == nil || opts.ItemsIDF == nil { + return nil, errors.New("items IDF is required for items user-to-user") + } + return newItemsUserToUser(cfg, n, timestamp, opts.ItemsIDF) + case "auto": + if opts == nil || opts.TagsIDF == nil || opts.ItemsIDF == nil { + return nil, errors.New("tags IDF and items IDF are required for auto user-to-user") + } + return newAutoUserToUser(cfg, n, timestamp, opts.TagsIDF, opts.ItemsIDF) + } + return nil, errors.New("unknown user-to-user method") +} + +type baseUserToUser[T any] struct { + name string + n int + timestamp time.Time + columnFunc *vm.Program + index *ann.HNSW[T] + users []*data.User +} + +func (b *baseUserToUser[T]) Users() []*data.User { + return b.users +} + +func (b *baseUserToUser[T]) PopAll(callback func(userId string, score []cache.Score)) { + for index, user := range b.users { + scores, err := b.index.SearchIndex(index, b.n+1, true) + if err != nil { + log.Logger().Error("failed to search index", zap.Error(err)) + return + } + callback(user.UserId, lo.Map(scores, func(v lo.Tuple2[int, float32], _ int) cache.Score { + return cache.Score{ + Id: b.users[v.A].UserId, + Score: -float64(v.B), + Timestamp: b.timestamp, + } + })) + } +} + +type embeddingUserToUser struct { + baseUserToUser[[]float32] + dimension int +} + +func newEmbeddingUserToUser(cfg UserToUserConfig, n int, timestamp time.Time) (UserToUser, error) { + // Compile column expression + columnFunc, err := expr.Compile(cfg.Column, expr.Env(map[string]any{ + "user": data.User{}, + })) + if err != nil { + return nil, err + } + return &embeddingUserToUser{baseUserToUser: baseUserToUser[[]float32]{ + name: cfg.Name, + n: n, + timestamp: timestamp, + columnFunc: columnFunc, + index: ann.NewHNSW[[]float32](floats.Euclidean), + users: []*data.User{}, + }}, nil +} + +func (e *embeddingUserToUser) Push(user *data.User, feedback []dataset.ID) { + // Evaluate filter function + result, err := expr.Run(e.columnFunc, map[string]any{ + "user": user, + }) + if err != nil { + log.Logger().Error("failed to evaluate column expression", zap.Error(err)) + return + } + // Check column type + v, ok := result.([]float32) + if !ok { + log.Logger().Error("invalid column type", zap.Any("column", result)) + return + } + // Check dimension + if e.dimension == 0 && len(v) > 0 { + e.dimension = len(v) + } else if e.dimension != len(v) { + log.Logger().Error("invalid dimension", zap.Int("expected", e.dimension), zap.Int("actual", len(v))) + return + } + // Push user + e.users = append(e.users, user) + _, err = e.index.Add(v) + if err != nil { + log.Logger().Error("failed to add user to index", zap.Error(err)) + } +} + +type tagsUserToUser struct { + baseUserToUser[[]dataset.ID] + IDF +} + +func newTagsUserToUser(cfg UserToUserConfig, n int, timestamp time.Time, idf []float32) (UserToUser, error) { + // Compile column expression + columnFunc, err := expr.Compile(cfg.Column, expr.Env(map[string]any{ + "user": data.User{}, + })) + if err != nil { + return nil, err + } + t := &tagsUserToUser{IDF: idf} + t.baseUserToUser = baseUserToUser[[]dataset.ID]{ + name: cfg.Name, + n: n, + timestamp: timestamp, + columnFunc: columnFunc, + index: ann.NewHNSW[[]dataset.ID](t.distance), + } + return t, nil +} + +func (t *tagsUserToUser) Push(user *data.User, feedback []dataset.ID) { + // Evaluate filter function + result, err := expr.Run(t.columnFunc, map[string]any{ + "user": user, + }) + if err != nil { + log.Logger().Error("failed to evaluate column expression", zap.Error(err)) + return + } + // Extract tags + tSet := mapset.NewSet[dataset.ID]() + flatten(result, tSet) + v := tSet.ToSlice() + sort.Slice(v, func(i, j int) bool { + return v[i] < v[j] + }) + // Push user + t.users = append(t.users, user) + _, err = t.index.Add(v) + if err != nil { + log.Logger().Error("failed to add user to index", zap.Error(err)) + } +} + +type itemsUserToUser struct { + baseUserToUser[[]dataset.ID] + IDF +} + +func newItemsUserToUser(cfg UserToUserConfig, n int, timestamp time.Time, idf []float32) (UserToUser, error) { + if cfg.Column != "" { + return nil, errors.New("column is not supported in items user-to-user") + } + i := &itemsUserToUser{IDF: idf} + i.baseUserToUser = baseUserToUser[[]dataset.ID]{ + name: cfg.Name, + n: n, + timestamp: timestamp, + index: ann.NewHNSW[[]dataset.ID](i.distance), + } + return i, nil +} + +func (i *itemsUserToUser) Push(user *data.User, feedback []dataset.ID) { + // Sort feedback + sort.Slice(feedback, func(i, j int) bool { + return feedback[i] < feedback[j] + }) + // Push user + i.users = append(i.users, user) + _, err := i.index.Add(feedback) + if err != nil { + log.Logger().Error("failed to add user to index", zap.Error(err)) + } +} + +type autoUserToUser struct { + baseUserToUser[lo.Tuple2[[]dataset.ID, []dataset.ID]] + tIDF IDF + iIDF IDF +} + +func newAutoUserToUser(cfg UserToUserConfig, n int, timestamp time.Time, tIDF, iIDF []float32) (UserToUser, error) { + a := &autoUserToUser{ + tIDF: tIDF, + iIDF: iIDF, + } + a.baseUserToUser = baseUserToUser[lo.Tuple2[[]dataset.ID, []dataset.ID]]{ + name: cfg.Name, + n: n, + timestamp: timestamp, + index: ann.NewHNSW[lo.Tuple2[[]dataset.ID, []dataset.ID]](a.distance), + } + return a, nil +} + +func (a *autoUserToUser) Push(user *data.User, feedback []dataset.ID) { + // Extract tags + tSet := mapset.NewSet[dataset.ID]() + flatten(user.Labels, tSet) + t := tSet.ToSlice() + sort.Slice(t, func(i, j int) bool { + return t[i] < t[j] + }) + // Sort feedback + sort.Slice(feedback, func(i, j int) bool { + return feedback[i] < feedback[j] + }) + // Push user + a.users = append(a.users, user) + _, err := a.index.Add(lo.Tuple2[[]dataset.ID, []dataset.ID]{A: t, B: feedback}) + if err != nil { + log.Logger().Error("failed to add user to index", zap.Error(err)) + } +} + +func (a *autoUserToUser) distance(u, v lo.Tuple2[[]dataset.ID, []dataset.ID]) float32 { + return (a.tIDF.distance(u.A, v.A) + a.iIDF.distance(u.B, v.B)) / 2 +} diff --git a/logics/user_to_user_test.go b/logics/user_to_user_test.go new file mode 100644 index 000000000..16101b283 --- /dev/null +++ b/logics/user_to_user_test.go @@ -0,0 +1,163 @@ +// Copyright 2025 gorse Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logics + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/zhenghaoz/gorse/dataset" + "github.com/zhenghaoz/gorse/storage/cache" + "github.com/zhenghaoz/gorse/storage/data" +) + +type UserToUserTestSuite struct { + suite.Suite +} + +func (suite *UserToUserTestSuite) TestEmbedding() { + timestamp := time.Now() + user2user, err := newEmbeddingUserToUser(UserToUserConfig{ + Column: "user.Labels.description", + }, 10, timestamp) + suite.NoError(err) + + for i := 0; i < 100; i++ { + user2user.Push(&data.User{ + UserId: strconv.Itoa(i), + Labels: map[string]any{ + "description": []float32{0.1 * float32(i), 0.2 * float32(i), 0.3 * float32(i)}, + }, + }, nil) + } + + var scores []cache.Score + user2user.PopAll(func(userId string, score []cache.Score) { + if userId == "0" { + scores = score + } + }) + suite.Len(scores, 10) + for i := 1; i <= 10; i++ { + suite.Equal(strconv.Itoa(i), scores[i-1].Id) + } +} + +func (suite *UserToUserTestSuite) TestTags() { + timestamp := time.Now() + idf := make([]float32, 101) + for i := range idf { + idf[i] = 1 + } + user2user, err := newTagsUserToUser(UserToUserConfig{ + Column: "user.Labels", + }, 10, timestamp, idf) + suite.NoError(err) + + for i := 0; i < 100; i++ { + labels := make(map[string]any) + for j := 1; j <= 100-i; j++ { + labels[strconv.Itoa(j)] = []dataset.ID{dataset.ID(j)} + } + user2user.Push(&data.User{ + UserId: strconv.Itoa(i), + Labels: labels, + }, nil) + } + + var scores []cache.Score + user2user.PopAll(func(userId string, score []cache.Score) { + if userId == "0" { + scores = score + } + }) + suite.Len(scores, 10) + for i := 1; i <= 10; i++ { + suite.Equal(strconv.Itoa(i), scores[i-1].Id) + } +} + +func (suite *UserToUserTestSuite) TestItems() { + timestamp := time.Now() + idf := make([]float32, 101) + for i := range idf { + idf[i] = 1 + } + user2user, err := newItemsUserToUser(UserToUserConfig{}, 10, timestamp, idf) + suite.NoError(err) + + for i := 0; i < 100; i++ { + feedback := make([]dataset.ID, 0, 100-i) + for j := 1; j <= 100-i; j++ { + feedback = append(feedback, dataset.ID(j)) + } + user2user.Push(&data.User{UserId: strconv.Itoa(i)}, feedback) + } + + var scores []cache.Score + user2user.PopAll(func(userId string, score []cache.Score) { + if userId == "0" { + scores = score + } + }) + suite.Len(scores, 10) + for i := 1; i <= 10; i++ { + suite.Equal(strconv.Itoa(i), scores[i-1].Id) + } +} + +func (suite *UserToUserTestSuite) TestAuto() { + timestamp := time.Now() + idf := make([]float32, 101) + for i := range idf { + idf[i] = 1 + } + user2user, err := newAutoUserToUser(UserToUserConfig{}, 10, timestamp, idf, idf) + suite.NoError(err) + + for i := 0; i < 100; i++ { + user := &data.User{UserId: strconv.Itoa(i)} + feedback := make([]dataset.ID, 0, 100-i) + if i%2 == 0 { + labels := make(map[string]any) + for j := 1; j <= 100-i; j++ { + labels[strconv.Itoa(j)] = []dataset.ID{dataset.ID(j)} + } + user.Labels = labels + } else { + for j := 1; j <= 100-i; j++ { + feedback = append(feedback, dataset.ID(j)) + } + } + user2user.Push(user, feedback) + } + + var scores0, scores1 []cache.Score + user2user.PopAll(func(userId string, score []cache.Score) { + if userId == "0" { + scores0 = score + } else if userId == "1" { + scores1 = score + } + }) + suite.Len(scores0, 10) + suite.Len(scores1, 10) +} + +func TestUserToUser(t *testing.T) { + suite.Run(t, new(UserToUserTestSuite)) +} diff --git a/master/master.go b/master/master.go index 05a102e70..59eb250f4 100644 --- a/master/master.go +++ b/master/master.go @@ -330,8 +330,6 @@ func (m *Master) RunPrivilegedTasksLoop() { tasks = []Task{ NewFitClickModelTask(m), NewFitRankingModelTask(m), - NewFindUserNeighborsTask(m), - NewFindItemNeighborsTask(m), } firstLoop = true ) @@ -430,8 +428,6 @@ func (m *Master) RunManagedTasksLoop() { privilegedTasks = []Task{ NewFitClickModelTask(m), NewFitRankingModelTask(m), - NewFindUserNeighborsTask(m), - NewFindItemNeighborsTask(m), } ragtagTasks = []Task{ NewCacheGarbageCollectionTask(m), diff --git a/master/rest.go b/master/rest.go index 3ad9fcdc7..9196f2440 100644 --- a/master/rest.go +++ b/master/rest.go @@ -849,8 +849,10 @@ func (m *Master) getNonPersonalized(request *restful.Request, response *restful. } func (m *Master) getItemToItem(request *restful.Request, response *restful.Response) { + name := request.PathParameter("name") itemId := request.PathParameter("item-id") - m.SearchDocuments(cache.ItemToItem, cache.Key(cache.Neighbors, itemId), nil, m.GetItem, request, response) + categories := request.QueryParameters("category") + m.SearchDocuments(cache.ItemToItem, cache.Key(name, itemId), categories, m.GetItem, request, response) } func (m *Master) getUserToUser(request *restful.Request, response *restful.Response) { diff --git a/master/rest_test.go b/master/rest_test.go index 83121f990..757dc4d2b 100644 --- a/master/rest_test.go +++ b/master/rest_test.go @@ -513,11 +513,12 @@ func TestServer_SearchDocumentsOfItems(t *testing.T) { } ctx := context.Background() operators := []ListOperator{ - {"Item Neighbors", cache.ItemToItem, cache.Key(cache.Neighbors, "0"), "", "/api/dashboard/item-to-item/neighbors/0"}, - {"Latest Items", cache.NonPersonalized, cache.Latest, "", "/api/dashboard/non-personalized/latest/"}, - {"Popular Items", cache.NonPersonalized, cache.Popular, "", "/api/dashboard/non-personalized/popular/"}, - {"Latest Items in Category", cache.NonPersonalized, cache.Latest, "*", "/api/dashboard/non-personalized/latest/"}, - {"Popular Items in Category", cache.NonPersonalized, cache.Popular, "*", "/api/dashboard/non-personalized/popular/"}, + {"ItemToItem", cache.ItemToItem, cache.Key(cache.Neighbors, "0"), "", "/api/dashboard/item-to-item/neighbors/0"}, + {"ItemToItemCategory", cache.ItemToItem, cache.Key(cache.Neighbors, "0"), "*", "/api/dashboard/item-to-item/neighbors/0"}, + {"LatestItems", cache.NonPersonalized, cache.Latest, "", "/api/dashboard/non-personalized/latest/"}, + {"PopularItems", cache.NonPersonalized, cache.Popular, "", "/api/dashboard/non-personalized/popular/"}, + {"LatestItemsCategory", cache.NonPersonalized, cache.Latest, "*", "/api/dashboard/non-personalized/latest/"}, + {"PopularItemsCategory", cache.NonPersonalized, cache.Popular, "*", "/api/dashboard/non-personalized/popular/"}, } for i, operator := range operators { t.Run(operator.Name, func(t *testing.T) { diff --git a/master/tasks.go b/master/tasks.go index 680da86c5..fded47ced 100644 --- a/master/tasks.go +++ b/master/tasks.go @@ -22,13 +22,11 @@ import ( "sync" "time" - "github.com/chewxy/math32" mapset "github.com/deckarep/golang-set/v2" "github.com/juju/errors" "github.com/samber/lo" "github.com/zhenghaoz/gorse/base" "github.com/zhenghaoz/gorse/base/encoding" - "github.com/zhenghaoz/gorse/base/heap" "github.com/zhenghaoz/gorse/base/log" "github.com/zhenghaoz/gorse/base/parallel" "github.com/zhenghaoz/gorse/base/progress" @@ -41,25 +39,20 @@ import ( "github.com/zhenghaoz/gorse/model/ranking" "github.com/zhenghaoz/gorse/storage/cache" "github.com/zhenghaoz/gorse/storage/data" - "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/protobuf/proto" - "modernc.org/sortutil" ) const ( PositiveFeedbackRate = "PositiveFeedbackRate" - TaskFindItemNeighbors = "Find neighbors of items" - TaskFindUserNeighbors = "Find neighbors of users" TaskFitRankingModel = "Fit collaborative filtering model" TaskFitClickModel = "Fit click-through rate prediction model" TaskSearchRankingModel = "Search collaborative filtering model" TaskSearchClickModel = "Search click-through rate prediction model" TaskCacheGarbageCollection = "Collect garbage in cache" - batchSize = 10000 - similarityShrink = 100 + batchSize = 10000 ) type Task interface { @@ -202,6 +195,9 @@ func (m *Master) runLoadDatasetTask() error { MemoryInUseBytesVec.WithLabelValues("ranking_train_set").Set(float64(sizeof.DeepSize(m.clickTrainSet))) MemoryInUseBytesVec.WithLabelValues("ranking_test_set").Set(float64(sizeof.DeepSize(m.clickTestSet))) + if err = m.updateUserToUser(dataSet); err != nil { + log.Logger().Error("failed to update user-to-user recommendation", zap.Error(err)) + } if err = m.updateItemToItem(dataSet); err != nil { log.Logger().Error("failed to update item-to-item recommendation", zap.Error(err)) } @@ -210,572 +206,6 @@ func (m *Master) runLoadDatasetTask() error { return nil } -// FindItemNeighborsTask updates neighbors of items. -type FindItemNeighborsTask struct { - *Master - lastNumItems int - lastNumFeedback int -} - -func NewFindItemNeighborsTask(m *Master) *FindItemNeighborsTask { - return &FindItemNeighborsTask{Master: m} -} - -func (t *FindItemNeighborsTask) name() string { - return TaskFindItemNeighbors -} - -func (t *FindItemNeighborsTask) priority() int { - return -t.rankingTrainSet.ItemCount() * t.rankingTrainSet.ItemCount() -} - -func (t *FindItemNeighborsTask) run(ctx context.Context, j *task.JobsAllocator) error { - t.rankingDataMutex.RLock() - defer t.rankingDataMutex.RUnlock() - dataset := t.rankingTrainSet - numItems := dataset.ItemCount() - numFeedback := dataset.Count() - - newCtx, span := t.tracer.Start(ctx, "Find Item Neighbors", dataset.ItemCount()) - defer span.End() - - if numItems == 0 { - return nil - } else if numItems == t.lastNumItems && numFeedback == t.lastNumFeedback { - log.Logger().Info("No item neighbors need to be updated.") - return nil - } - - startTaskTime := time.Now() - log.Logger().Info("start searching neighbors of items", - zap.Int("n_cache", t.Config.Recommend.CacheSize)) - // create progress tracker - completed := make(chan struct{}, 1000) - go func() { - completedCount, previousCount := 0, 0 - ticker := time.NewTicker(time.Second * 10) - for { - select { - case _, ok := <-completed: - if !ok { - return - } - completedCount++ - case <-ticker.C: - throughput := completedCount - previousCount - previousCount = completedCount - if throughput > 0 { - log.Logger().Debug("searching neighbors of items", - zap.Int("n_complete_items", completedCount), - zap.Int("n_items", dataset.ItemCount()), - zap.Int("throughput", throughput/10)) - span.Add(throughput) - } - } - } - }() - - userIDF := make([]float32, dataset.UserCount()) - if t.Config.Recommend.ItemNeighbors.NeighborType == config.NeighborTypeRelated || - t.Config.Recommend.ItemNeighbors.NeighborType == config.NeighborTypeAuto { - for _, feedbacks := range dataset.ItemFeedback { - sort.Sort(sortutil.Int32Slice(feedbacks)) - } - // inverse document frequency of users - for i := range dataset.UserFeedback { - if dataset.ItemCount() == len(dataset.UserFeedback[i]) { - userIDF[i] = 1 - } else { - userIDF[i] = math32.Log(float32(dataset.ItemCount()) / float32(len(dataset.UserFeedback[i]))) - } - } - } - labeledItems := make([][]int32, dataset.NumItemLabels) - labelIDF := make([]float32, dataset.NumItemLabels) - if t.Config.Recommend.ItemNeighbors.NeighborType == config.NeighborTypeSimilar || - t.Config.Recommend.ItemNeighbors.NeighborType == config.NeighborTypeAuto { - for i, itemLabels := range dataset.ItemFeatures { - sort.Slice(itemLabels, func(i, j int) bool { - return itemLabels[i].A < itemLabels[j].A - }) - for _, label := range itemLabels { - labeledItems[label.A] = append(labeledItems[label.A], int32(i)) - } - } - // inverse document frequency of labels - for i := range labeledItems { - labeledItems[i] = lo.Uniq(labeledItems[i]) - if dataset.ItemCount() == len(labeledItems[i]) { - labelIDF[i] = 1 - } else { - labelIDF[i] = math32.Log(float32(dataset.ItemCount()) / float32(len(labeledItems[i]))) - } - } - } - - start := time.Now() - err := t.findItemNeighborsBruteForce(dataset, labeledItems, labelIDF, userIDF, completed, j) - searchTime := time.Since(start) - - close(completed) - if err != nil { - log.Logger().Error("failed to searching neighbors of items", zap.Error(err)) - progress.Fail(newCtx, err) - FindItemNeighborsTotalSeconds.Set(0) - } else { - if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.ItemToItemUpdateTime, cache.Neighbors), time.Now())); err != nil { - log.Logger().Error("failed to set neighbors of items update time", zap.Error(err)) - } - log.Logger().Info("complete searching neighbors of items", - zap.String("search_time", searchTime.String())) - FindItemNeighborsTotalSeconds.Set(time.Since(startTaskTime).Seconds()) - } - - t.lastNumItems = numItems - t.lastNumFeedback = numFeedback - return nil -} - -func (m *Master) findItemNeighborsBruteForce(dataset *ranking.DataSet, labeledItems [][]int32, - labelIDF, userIDF []float32, completed chan struct{}, j *task.JobsAllocator) error { - ctx := context.Background() - var ( - updateItemCount atomic.Float64 - findNeighborSeconds atomic.Float64 - ) - - var vector VectorsInterface - switch m.Config.Recommend.ItemNeighbors.NeighborType { - case config.NeighborTypeSimilar: - vector = NewVectors(lo.Map(dataset.ItemFeatures, func(features []lo.Tuple2[int32, float32], _ int) []int32 { - indices, _ := lo.Unzip2(features) - return indices - }), labeledItems, labelIDF) - case config.NeighborTypeRelated: - vector = NewVectors(dataset.ItemFeedback, dataset.UserFeedback, userIDF) - case config.NeighborTypeAuto: - vector = NewDualVectors( - NewVectors(lo.Map(dataset.ItemFeatures, func(features []lo.Tuple2[int32, float32], _ int) []int32 { - indices, _ := lo.Unzip2(features) - return indices - }), labeledItems, labelIDF), - NewVectors(dataset.ItemFeedback, dataset.UserFeedback, userIDF)) - default: - return errors.NotImplementedf("item neighbor type `%v`", m.Config.Recommend.ItemNeighbors.NeighborType) - } - - err := parallel.DynamicParallel(dataset.ItemCount(), j, func(workerId, itemIndex int) error { - defer func() { - completed <- struct{}{} - }() - startSearchTime := time.Now() - itemId := dataset.ItemIndex.ToName(int32(itemIndex)) - if !m.checkItemNeighborCacheTimeout(itemId, dataset.CategorySet.ToSlice()) { - return nil - } - updateItemCount.Add(1) - startTime := time.Now() - nearItemsFilters := make(map[string]*heap.TopKFilter[int32, float64]) - nearItemsFilters[""] = heap.NewTopKFilter[int32, float64](m.Config.Recommend.CacheSize) - for _, category := range dataset.CategorySet.ToSlice() { - nearItemsFilters[category] = heap.NewTopKFilter[int32, float64](m.Config.Recommend.CacheSize) - } - - adjacencyItems := vector.Neighbors(itemIndex) - for _, j := range adjacencyItems { - if j != int32(itemIndex) && !dataset.HiddenItems[j] { - score := vector.Distance(itemIndex, int(j)) - if score > 0 { - nearItemsFilters[""].Push(j, float64(score)) - for _, category := range dataset.ItemCategories[j] { - nearItemsFilters[category].Push(j, float64(score)) - } - } - } - } - - aggregator := cache.NewDocumentAggregator(startSearchTime) - for category, nearItemsFilter := range nearItemsFilters { - elem, scores := nearItemsFilter.PopAll() - recommends := make([]string, len(elem)) - for i := range recommends { - recommends[i] = dataset.ItemIndex.ToName(elem[i]) - } - aggregator.Add(category, recommends, scores) - } - if err := m.CacheClient.AddScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, itemId), aggregator.ToSlice()); err != nil { - return errors.Trace(err) - } - if err := m.CacheClient.DeleteScores(ctx, []string{cache.ItemToItem}, cache.ScoreCondition{ - Subset: proto.String(cache.Key(cache.Neighbors, itemId)), - Before: &aggregator.Timestamp, - }); err != nil { - return errors.Trace(err) - } - if err := m.CacheClient.Set( - ctx, - cache.Time(cache.Key(cache.ItemToItemUpdateTime, cache.Key(cache.Neighbors, itemId)), time.Now()), - cache.String(cache.Key(cache.ItemToItemDigest, cache.Key(cache.Neighbors, itemId)), m.Config.ItemNeighborDigest())); err != nil { - return errors.Trace(err) - } - findNeighborSeconds.Add(time.Since(startTime).Seconds()) - return nil - }) - if err != nil { - return errors.Trace(err) - } - UpdateItemNeighborsTotal.Set(updateItemCount.Load()) - FindItemNeighborsSecondsVec.WithLabelValues("find_item_neighbors").Set(findNeighborSeconds.Load()) - FindItemNeighborsSecondsVec.WithLabelValues("build_index").Set(0) - ItemNeighborIndexRecall.Set(1) - return nil -} - -// FindUserNeighborsTask updates neighbors of users. -type FindUserNeighborsTask struct { - *Master - lastNumUsers int - lastNumFeedback int -} - -func NewFindUserNeighborsTask(m *Master) *FindUserNeighborsTask { - return &FindUserNeighborsTask{Master: m} -} - -func (t *FindUserNeighborsTask) name() string { - return TaskFindUserNeighbors -} - -func (t *FindUserNeighborsTask) priority() int { - return -t.rankingTrainSet.UserCount() * t.rankingTrainSet.UserCount() -} - -func (t *FindUserNeighborsTask) run(ctx context.Context, j *task.JobsAllocator) error { - t.rankingDataMutex.RLock() - defer t.rankingDataMutex.RUnlock() - dataset := t.rankingTrainSet - numUsers := dataset.UserCount() - numFeedback := dataset.Count() - - newCtx, span := t.tracer.Start(ctx, "Find User Neighbors", dataset.UserCount()) - defer span.End() - - if numUsers == 0 { - return nil - } else if numUsers == t.lastNumUsers && numFeedback == t.lastNumFeedback { - log.Logger().Info("No update of user neighbors needed.") - return nil - } - - startTaskTime := time.Now() - log.Logger().Info("start searching neighbors of users", - zap.Int("n_cache", t.Config.Recommend.CacheSize)) - // create progress tracker - completed := make(chan struct{}, 1000) - go func() { - completedCount, previousCount := 0, 0 - ticker := time.NewTicker(time.Second) - for { - select { - case _, ok := <-completed: - if !ok { - return - } - completedCount++ - case <-ticker.C: - throughput := completedCount - previousCount - previousCount = completedCount - if throughput > 0 { - log.Logger().Debug("searching neighbors of users", - zap.Int("n_complete_users", completedCount), - zap.Int("n_users", dataset.UserCount()), - zap.Int("throughput", throughput)) - span.Add(throughput) - } - } - } - }() - - itemIDF := make([]float32, dataset.ItemCount()) - if t.Config.Recommend.UserNeighbors.NeighborType == config.NeighborTypeRelated || - t.Config.Recommend.UserNeighbors.NeighborType == config.NeighborTypeAuto { - for _, feedbacks := range dataset.UserFeedback { - sort.Sort(sortutil.Int32Slice(feedbacks)) - } - // inverse document frequency of items - for i := range dataset.ItemFeedback { - if dataset.UserCount() == len(dataset.ItemFeedback[i]) { - itemIDF[i] = 1 - } else { - itemIDF[i] = math32.Log(float32(dataset.UserCount()) / float32(len(dataset.ItemFeedback[i]))) - } - } - } - labeledUsers := make([][]int32, dataset.NumUserLabels) - labelIDF := make([]float32, dataset.NumUserLabels) - if t.Config.Recommend.UserNeighbors.NeighborType == config.NeighborTypeSimilar || - t.Config.Recommend.UserNeighbors.NeighborType == config.NeighborTypeAuto { - for i, userLabels := range dataset.UserFeatures { - sort.Slice(userLabels, func(i, j int) bool { - return userLabels[i].A < userLabels[j].A - }) - for _, label := range userLabels { - labeledUsers[label.A] = append(labeledUsers[label.A], int32(i)) - } - } - // inverse document frequency of labels - for i := range labeledUsers { - labeledUsers[i] = lo.Uniq(labeledUsers[i]) - if dataset.UserCount() == len(labeledUsers[i]) { - labelIDF[i] = 1 - } else { - labelIDF[i] = math32.Log(float32(dataset.UserCount()) / float32(len(labeledUsers[i]))) - } - } - } - - start := time.Now() - err := t.findUserNeighborsBruteForce(newCtx, dataset, labeledUsers, labelIDF, itemIDF, completed, j) - searchTime := time.Since(start) - - close(completed) - if err != nil { - log.Logger().Error("failed to searching neighbors of users", zap.Error(err)) - progress.Fail(newCtx, err) - FindUserNeighborsTotalSeconds.Set(0) - } else { - if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.UserToUserUpdateTime, cache.Neighbors), time.Now())); err != nil { - log.Logger().Error("failed to set neighbors of users update time", zap.Error(err)) - } - log.Logger().Info("complete searching neighbors of users", - zap.String("search_time", searchTime.String())) - FindUserNeighborsTotalSeconds.Set(time.Since(startTaskTime).Seconds()) - } - - t.lastNumUsers = numUsers - t.lastNumFeedback = numFeedback - return nil -} - -func (m *Master) findUserNeighborsBruteForce(ctx context.Context, dataset *ranking.DataSet, labeledUsers [][]int32, labelIDF, itemIDF []float32, completed chan struct{}, j *task.JobsAllocator) error { - var ( - updateUserCount atomic.Float64 - findNeighborSeconds atomic.Float64 - ) - - var vectors VectorsInterface - switch m.Config.Recommend.UserNeighbors.NeighborType { - case config.NeighborTypeSimilar: - vectors = NewVectors(lo.Map(dataset.UserFeatures, func(features []lo.Tuple2[int32, float32], _ int) []int32 { - indices, _ := lo.Unzip2(features) - return indices - }), labeledUsers, labelIDF) - case config.NeighborTypeRelated: - vectors = NewVectors(dataset.UserFeedback, dataset.ItemFeedback, itemIDF) - case config.NeighborTypeAuto: - vectors = NewDualVectors( - NewVectors(lo.Map(dataset.UserFeatures, func(features []lo.Tuple2[int32, float32], _ int) []int32 { - indices, _ := lo.Unzip2(features) - return indices - }), labeledUsers, labelIDF), - NewVectors(dataset.UserFeedback, dataset.ItemFeedback, itemIDF)) - default: - return errors.NotImplementedf("user neighbor type `%v`", m.Config.Recommend.UserNeighbors.NeighborType) - } - - err := parallel.DynamicParallel(dataset.UserCount(), j, func(workerId, userIndex int) error { - defer func() { - completed <- struct{}{} - }() - startSearchTime := time.Now() - userId := dataset.UserIndex.ToName(int32(userIndex)) - if !m.checkUserNeighborCacheTimeout(userId) { - return nil - } - updateUserCount.Add(1) - startTime := time.Now() - nearUsers := heap.NewTopKFilter[int32, float64](m.Config.Recommend.CacheSize) - - adjacencyUsers := vectors.Neighbors(userIndex) - for _, j := range adjacencyUsers { - if j != int32(userIndex) { - score := vectors.Distance(userIndex, int(j)) - if score > 0 { - nearUsers.Push(j, float64(score)) - } - } - } - - elem, scores := nearUsers.PopAll() - recommends := make([]string, len(elem)) - for i := range recommends { - recommends[i] = dataset.UserIndex.ToName(elem[i]) - } - aggregator := cache.NewDocumentAggregator(startSearchTime) - aggregator.Add("", recommends, scores) - if err := m.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), aggregator.ToSlice()); err != nil { - return errors.Trace(err) - } - if err := m.CacheClient.DeleteScores(ctx, []string{cache.UserToUser}, cache.ScoreCondition{ - Subset: proto.String(cache.Key(cache.Neighbors, userId)), - Before: &aggregator.Timestamp, - }); err != nil { - return errors.Trace(err) - } - if err := m.CacheClient.Set( - ctx, - cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId)), time.Now()), - cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId)), m.Config.UserNeighborDigest())); err != nil { - return errors.Trace(err) - } - findNeighborSeconds.Add(time.Since(startTime).Seconds()) - return nil - }) - if err != nil { - return errors.Trace(err) - } - UpdateUserNeighborsTotal.Set(updateUserCount.Load()) - FindUserNeighborsSecondsVec.WithLabelValues("find_item_neighbors").Set(findNeighborSeconds.Load()) - FindUserNeighborsSecondsVec.WithLabelValues("build_index").Set(0) - UserNeighborIndexRecall.Set(1) - return nil -} - -func commonElements(a, b []int32, weights []float32) (float32, float32) { - i, j, sum, count := 0, 0, float32(0), float32(0) - for i < len(a) && j < len(b) { - if a[i] == b[j] { - sum += weights[a[i]] - count++ - i++ - j++ - } else if a[i] < b[j] { - i++ - } else if a[i] > b[j] { - j++ - } - } - return sum, count -} - -func weightedSum(a []int32, weights []float32) float32 { - var sum float32 - for _, i := range a { - sum += weights[i] - } - return sum -} - -// checkUserNeighborCacheTimeout checks if user neighbor cache stale. -// 1. if cache is empty, stale. -// 2. if modified time > update time, stale. -func (m *Master) checkUserNeighborCacheTimeout(userId string) bool { - var ( - modifiedTime time.Time - updateTime time.Time - cacheDigest string - err error - ) - ctx := context.Background() - // check cache - if items, err := m.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, 0, -1); err != nil { - log.Logger().Error("failed to load user neighbors", zap.String("user_id", userId), zap.Error(err)) - return true - } else if len(items) == 0 { - return true - } - // read digest - cacheDigest, err = m.CacheClient.Get(ctx, cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId))).String() - if err != nil { - if !errors.Is(err, errors.NotFound) { - log.Logger().Error("failed to read user neighbors digest", zap.Error(err)) - } - return true - } - if cacheDigest != m.Config.UserNeighborDigest() { - return true - } - // read modified time - modifiedTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastModifyUserTime, userId)).Time() - if err != nil { - if !errors.Is(err, errors.NotFound) { - log.Logger().Error("failed to read last modify user time", zap.Error(err)) - } - return true - } - // read update time - updateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId))).Time() - if err != nil { - if !errors.Is(err, errors.NotFound) { - log.Logger().Error("failed to read last update user neighbors time", zap.Error(err)) - } - return true - } - // check cache expire - if updateTime.Before(time.Now().Add(-m.Config.Recommend.CacheExpire)) { - return true - } - // check time - return updateTime.Unix() <= modifiedTime.Unix() -} - -// checkItemNeighborCacheTimeout checks if item neighbor cache stale. -// 1. if cache is empty, stale. -// 2. if modified time > update time, stale. -func (m *Master) checkItemNeighborCacheTimeout(itemId string, categories []string) bool { - var ( - modifiedTime time.Time - updateTime time.Time - cacheDigest string - err error - ) - ctx := context.Background() - - // check cache - for _, category := range append([]string{""}, categories...) { - items, err := m.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, itemId), []string{category}, 0, -1) - if err != nil { - log.Logger().Error("failed to load item neighbors", zap.String("item_id", itemId), zap.Error(err)) - return true - } else if len(items) == 0 { - return true - } - } - // read digest - cacheDigest, err = m.CacheClient.Get(ctx, cache.Key(cache.ItemToItemDigest, cache.Key(cache.Neighbors, itemId))).String() - if err != nil { - if !errors.Is(err, errors.NotFound) { - log.Logger().Error("failed to read item neighbors digest", zap.Error(err)) - } - return true - } - if cacheDigest != m.Config.ItemNeighborDigest() { - return true - } - // read modified time - modifiedTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastModifyItemTime, itemId)).Time() - if err != nil { - if !errors.Is(err, errors.NotFound) { - log.Logger().Error("failed to read last modify item time", zap.Error(err)) - } - return true - } - // read update time - updateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.ItemToItemUpdateTime, cache.Key(cache.Neighbors, itemId))).Time() - if err != nil { - if !errors.Is(err, errors.NotFound) { - log.Logger().Error("failed to read last update item neighbors time", zap.Error(err)) - } - return true - } - // check cache expire - if updateTime.Before(time.Now().Add(-m.Config.Recommend.CacheExpire)) { - return true - } - // check time - return updateTime.Unix() <= modifiedTime.Unix() -} - type FitRankingModelTask struct { *Master lastNumFeedback int @@ -1222,7 +652,7 @@ func (m *Master) LoadDataFromDatabase( return nil, nil, nil, errors.Trace(err) } - dataSet = dataset.NewDataset(time.Now(), estimatedNumItems) + dataSet = dataset.NewDataset(time.Now(), estimatedNumUsers, estimatedNumItems) newCtx, span := progress.Start(ctx, "LoadDataFromDatabase", estimatedNumUsers+estimatedNumItems+estimatedNumFeedbacks) @@ -1284,6 +714,7 @@ func (m *Master) LoadDataFromDatabase( }) } } + dataSet.AddUser(user) } span.Add(len(users)) } @@ -1430,6 +861,7 @@ func (m *Master) LoadDataFromDatabase( break } } + dataSet.AddFeedback(f.UserId, f.ItemId) } span.Add(len(feedback)) } @@ -1558,13 +990,31 @@ func (m *Master) LoadDataFromDatabase( func (m *Master) updateItemToItem(dataset *dataset.Dataset) error { ctx, span := m.tracer.Start(context.Background(), "Generate item-to-item recommendation", - len(dataset.GetItems())*len(m.Config.Recommend.ItemToItem)*2) + len(dataset.GetItems())*(len(m.Config.Recommend.ItemToItem)+1)*2) defer span.End() + // Add built-in item-to-item recommenders + itemToItemConfigs := m.Config.Recommend.ItemToItem + builtInConfig := config.ItemToItemConfig{} + builtInConfig.Name = cache.Neighbors + switch m.Config.Recommend.ItemNeighbors.NeighborType { + case config.NeighborTypeSimilar: + builtInConfig.Type = "tags" + builtInConfig.Column = "item.Labels" + case config.NeighborTypeRelated: + builtInConfig.Type = "users" + case config.NeighborTypeAuto: + builtInConfig.Type = "auto" + } + itemToItemConfigs = append(itemToItemConfigs, builtInConfig) + // Build item-to-item recommenders - itemToItemRecommenders := make([]logics.ItemToItem, 0, len(m.Config.Recommend.ItemToItem)) - for _, cfg := range m.Config.Recommend.ItemToItem { - recommender, err := logics.NewItemToItem(cfg, m.Config.Recommend.CacheSize, dataset.GetTimestamp(), dataset.GetItemColumnValuesIDF()) + itemToItemRecommenders := make([]logics.ItemToItem, 0, len(itemToItemConfigs)) + for _, cfg := range itemToItemConfigs { + recommender, err := logics.NewItemToItem(cfg, m.Config.Recommend.CacheSize, dataset.GetTimestamp(), &logics.ItemToItemOptions{ + TagsIDF: dataset.GetItemColumnValuesIDF(), + UsersIDF: dataset.GetUserIDF(), + }) if err != nil { return errors.Trace(err) } @@ -1572,10 +1022,10 @@ func (m *Master) updateItemToItem(dataset *dataset.Dataset) error { } // Push items to item-to-item recommenders - for _, item := range dataset.GetItems() { + for i, item := range dataset.GetItems() { if !item.IsHidden { for _, recommender := range itemToItemRecommenders { - recommender.Push(item) + recommender.Push(&item, dataset.GetItemFeedback()[i]) span.Add(1) } } @@ -1584,8 +1034,8 @@ func (m *Master) updateItemToItem(dataset *dataset.Dataset) error { // Save item-to-item recommendations to cache for i, recommender := range itemToItemRecommenders { recommender.PopAll(func(itemId string, score []cache.Score) { - itemToItemConfig := m.Config.Recommend.ItemToItem[i] - if m.needUpdateItemToItem(itemId, m.Config.Recommend.ItemToItem[i]) { + itemToItemConfig := itemToItemConfigs[i] + if m.needUpdateItemToItem(itemId, itemToItemConfigs[i]) { log.Logger().Debug("update item-to-item recommendation", zap.String("item_id", itemId), zap.String("name", itemToItemConfig.Name), @@ -1613,8 +1063,6 @@ func (m *Master) updateItemToItem(dataset *dataset.Dataset) error { } // needUpdateItemToItem checks if item-to-item recommendation needs to be updated. -// 1. The cache is empty. -// 2. The modified time is newer than the last update time. func (m *Master) needUpdateItemToItem(itemId string, itemToItemConfig config.ItemToItemConfig) bool { ctx := context.Background() @@ -1651,3 +1099,93 @@ func (m *Master) needUpdateItemToItem(itemId string, itemToItemConfig config.Ite } return updateTime.Before(time.Now().Add(-m.Config.Recommend.CacheExpire)) } + +func (m *Master) updateUserToUser(dataset *dataset.Dataset) error { + ctx, span := m.tracer.Start(context.Background(), "Generate user-to-user recommendation", + len(dataset.GetUsers())*2) + defer span.End() + + // Build user-to-user recommenders + var cfg logics.UserToUserConfig + cfg.Name = cache.Neighbors + switch m.Config.Recommend.UserNeighbors.NeighborType { + case config.NeighborTypeSimilar: + cfg.Type = "tags" + cfg.Column = "user.Labels" + case config.NeighborTypeRelated: + cfg.Type = "items" + case config.NeighborTypeAuto: + cfg.Type = "auto" + } + userToUserRecommender, err := logics.NewUserToUser(cfg, m.Config.Recommend.CacheSize, dataset.GetTimestamp(), &logics.UserToUserOptions{ + TagsIDF: dataset.GetUserColumnValuesIDF(), + ItemsIDF: dataset.GetItemIDF(), + }) + if err != nil { + return errors.Trace(err) + } + + // Push users to user-to-user recommender + for i, user := range dataset.GetUsers() { + userToUserRecommender.Push(&user, dataset.GetUserFeedback()[i]) + span.Add(1) + } + + // Save user-to-user recommendations to cache + userToUserRecommender.PopAll(func(userId string, score []cache.Score) { + if m.needUpdateUserToUser(userId) { + log.Logger().Debug("update user neighbors", + zap.String("user_id", userId), + zap.Int("n_recommendations", len(score))) + // Save user-to-user recommendations to cache + if err := m.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), score); err != nil { + log.Logger().Error("failed to save user neighbors to cache", zap.String("user_id", userId), zap.Error(err)) + return + } + // Save user-to-user digest and last update time to cache + if err := m.CacheClient.Set(ctx, + cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId)), m.Config.UserNeighborDigest()), + cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId)), time.Now()), + ); err != nil { + log.Logger().Error("failed to save user neighbors digest to cache", zap.String("user_id", userId), zap.Error(err)) + return + } + } + }) + return nil +} + +// needUpdateUserToUser checks if user-to-user recommendation needs to be updated. +func (m *Master) needUpdateUserToUser(userId string) bool { + ctx := context.Background() + + // check cache + if items, err := m.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), nil, 0, -1); err != nil { + log.Logger().Error("failed to load user neighbors", zap.String("user_id", userId), zap.Error(err)) + return true + } else if len(items) == 0 { + return true + } + + // read digest + cacheDigest, err := m.CacheClient.Get(ctx, cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId))).String() + if err != nil { + if !errors.Is(err, errors.NotFound) { + log.Logger().Error("failed to read user neighbors digest", zap.Error(err)) + } + return true + } + if cacheDigest != m.Config.UserNeighborDigest() { + return true + } + + // check update time + updateTime, err := m.CacheClient.Get(ctx, cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId))).Time() + if err != nil { + if !errors.Is(err, errors.NotFound) { + log.Logger().Error("failed to read last update user neighbors time", zap.Error(err)) + } + return true + } + return updateTime.Before(time.Now().Add(-m.Config.Recommend.CacheExpire)) +} diff --git a/master/tasks_test.go b/master/tasks_test.go index 1ea3c49e6..a56724096 100644 --- a/master/tasks_test.go +++ b/master/tasks_test.go @@ -26,7 +26,7 @@ import ( "github.com/zhenghaoz/gorse/storage/data" ) -func (s *MasterTestSuite) TestFindItemNeighborsBruteForce() { +func (s *MasterTestSuite) TestFindItemNeighbors() { ctx := context.Background() // create config s.Config = &config.Config{} @@ -81,36 +81,34 @@ func (s *MasterTestSuite) TestFindItemNeighborsBruteForce() { } // load mock dataset - dataset, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, + dataset, _, dataSet, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator(), nil) s.NoError(err) s.rankingTrainSet = dataset // similar items (common users) s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeRelated - neighborTask := NewFindItemNeighborsTask(&s.Master) - s.NoError(neighborTask.run(context.Background(), nil)) - similar, err := s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100) + s.NoError(s.updateItemToItem(dataSet)) + similar, err := s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "9"), nil, 0, 100) s.NoError(err) s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) // similar items in category (common users) similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "9"), []string{"*"}, 0, 100) s.NoError(err) - s.Equal([]string{"7", "5", "1"}, cache.ConvertDocumentsToValues(similar)) + s.Equal([]string{"7", "5"}, cache.ConvertDocumentsToValues(similar)) // similar items (common labels) err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "8"), time.Now())) s.NoError(err) s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeSimilar - neighborTask = NewFindItemNeighborsTask(&s.Master) - s.NoError(neighborTask.run(context.Background(), nil)) - similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100) + s.NoError(s.updateItemToItem(dataSet)) + similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "8"), nil, 0, 100) s.NoError(err) s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) // similar items in category (common labels) similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "8"), []string{"*"}, 0, 100) s.NoError(err) - s.Equal([]string{"0", "2", "6"}, cache.ConvertDocumentsToValues(similar)) + s.Equal([]string{"0", "2"}, cache.ConvertDocumentsToValues(similar)) // similar items (auto) err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "8"), time.Now())) @@ -118,167 +116,16 @@ func (s *MasterTestSuite) TestFindItemNeighborsBruteForce() { err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "9"), time.Now())) s.NoError(err) s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeAuto - neighborTask = NewFindItemNeighborsTask(&s.Master) - s.NoError(neighborTask.run(context.Background(), nil)) - similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100) + s.NoError(s.updateItemToItem(dataSet)) + similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "8"), nil, 0, 100) s.NoError(err) s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) - similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100) + similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "9"), nil, 0, 100) s.NoError(err) s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) } -// -//func (s *MasterTestSuite) TestFindItemNeighborsIVF() { -// // create mock master -// ctx := context.Background() -// // create config -// s.Config = &config.Config{} -// s.Config.Recommend.CacheSize = 3 -// s.Config.Master.NumJobs = 4 -// s.Config.Recommend.ItemNeighbors.EnableIndex = true -// s.Config.Recommend.ItemNeighbors.IndexRecall = 1 -// s.Config.Recommend.ItemNeighbors.IndexFitEpoch = 10 -// // collect similar -// items := []data.Item{ -// {ItemId: "0", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "b", "c", "d"}, Comment: ""}, -// {ItemId: "1", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""}, -// {ItemId: "2", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"b", "c", "d"}, Comment: ""}, -// {ItemId: "3", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{}, Comment: ""}, -// {ItemId: "4", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{"b", "c"}, Comment: ""}, -// {ItemId: "5", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""}, -// {ItemId: "6", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"c"}, Comment: ""}, -// {ItemId: "7", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""}, -// {ItemId: "8", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "b", "c", "d", "e"}, Comment: ""}, -// {ItemId: "9", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{}, Comment: ""}, -// } -// feedbacks := make([]data.Feedback, 0) -// for i := 0; i < 10; i++ { -// for j := 0; j <= i; j++ { -// if i%2 == 1 { -// feedbacks = append(feedbacks, data.Feedback{ -// FeedbackKey: data.FeedbackKey{ -// ItemId: strconv.Itoa(i), -// UserId: strconv.Itoa(j), -// FeedbackType: "FeedbackType", -// }, -// Timestamp: time.Now(), -// }) -// } -// } -// } -// var err error -// err = s.DataClient.BatchInsertItems(ctx, items) -// s.NoError(err) -// err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true) -// s.NoError(err) -// -// // insert hidden item -// err = s.DataClient.BatchInsertItems(ctx, []data.Item{{ -// ItemId: "10", -// Labels: []string{"a", "b", "c", "d", "e"}, -// IsHidden: true, -// }}) -// s.NoError(err) -// for i := 0; i <= 10; i++ { -// err = s.DataClient.BatchInsertFeedback(ctx, []data.Feedback{{ -// FeedbackKey: data.FeedbackKey{UserId: strconv.Itoa(i), ItemId: "10", FeedbackType: "FeedbackType"}, -// }}, true, true, true) -// s.NoError(err) -// } -// -// // load mock dataset -// dataset, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, -// nil, 0, 0, NewOnlineEvaluator(), nil) -// s.NoError(err) -// s.rankingTrainSet = dataset -// -// // similar items (common users) -// s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeRelated -// neighborTask := NewFindItemNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err := s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) -// // similar items in category (common users) -// similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "9"), []string{"*"}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"7", "5", "1"}, cache.ConvertDocumentsToValues(similar)) -// -// // similar items (common labels) -// err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "8"), time.Now())) -// s.NoError(err) -// s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeSimilar -// neighborTask = NewFindItemNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) -// // similar items in category (common labels) -// similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "8"), []string{"*"}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"0", "2", "6"}, cache.ConvertDocumentsToValues(similar)) -// -// // similar items (auto) -// err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "8"), time.Now())) -// s.NoError(err) -// err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "9"), time.Now())) -// s.NoError(err) -// s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeAuto -// neighborTask = NewFindItemNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) -// similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) -//} - -//func (s *MasterTestSuite) TestFindItemNeighborsIVF_ZeroIDF() { -// ctx := context.Background() -// // create config -// s.Config = &config.Config{} -// s.Config.Recommend.CacheSize = 3 -// s.Config.Master.NumJobs = 4 -// s.Config.Recommend.ItemNeighbors.EnableIndex = true -// s.Config.Recommend.ItemNeighbors.IndexRecall = 1 -// s.Config.Recommend.ItemNeighbors.IndexFitEpoch = 10 -// -// // create dataset -// err := s.DataClient.BatchInsertItems(ctx, []data.Item{ -// {ItemId: "0", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "a"}, Comment: ""}, -// {ItemId: "1", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "a"}, Comment: ""}, -// }) -// s.NoError(err) -// err = s.DataClient.BatchInsertFeedback(ctx, []data.Feedback{ -// {FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "0", ItemId: "0"}}, -// {FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "0", ItemId: "1"}}, -// }, true, true, true) -// s.NoError(err) -// dataset, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, -// nil, 0, 0, NewOnlineEvaluator(), nil) -// s.NoError(err) -// s.rankingTrainSet = dataset -// -// // similar items (common users) -// s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeRelated -// neighborTask := NewFindItemNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err := s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "0"), []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"1"}, cache.ConvertDocumentsToValues(similar)) -// -// // similar items (common labels) -// s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeSimilar -// neighborTask = NewFindItemNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err = s.CacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "0"), []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"1"}, cache.ConvertDocumentsToValues(similar)) -//} - -func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() { +func (s *MasterTestSuite) TestFindUserNeighbors() { ctx := context.Background() // create config s.Config = &config.Config{} @@ -317,16 +164,15 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() { s.NoError(err) err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true) s.NoError(err) - dataset, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, + dataset, _, dataSet, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator(), nil) s.NoError(err) s.rankingTrainSet = dataset // similar items (common users) s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated - neighborTask := NewFindUserNeighborsTask(&s.Master) - s.NoError(neighborTask.run(context.Background(), nil)) - similar, err := s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100) + s.NoError(s.updateUserToUser(dataSet)) + similar, err := s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), nil, 0, 100) s.NoError(err) s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) @@ -334,9 +180,8 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() { err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "8"), time.Now())) s.NoError(err) s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar - neighborTask = NewFindUserNeighborsTask(&s.Master) - s.NoError(neighborTask.run(context.Background(), nil)) - similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100) + s.NoError(s.updateUserToUser(dataSet)) + similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), nil, 0, 100) s.NoError(err) s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) @@ -346,141 +191,15 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() { err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "9"), time.Now())) s.NoError(err) s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeAuto - neighborTask = NewFindUserNeighborsTask(&s.Master) - s.NoError(neighborTask.run(context.Background(), nil)) - similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100) + s.NoError(s.updateUserToUser(dataSet)) + similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), nil, 0, 100) s.NoError(err) s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) - similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100) + similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), nil, 0, 100) s.NoError(err) s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) } -// -//func (s *MasterTestSuite) TestFindUserNeighborsIVF() { -// ctx := context.Background() -// // create config -// s.Config = &config.Config{} -// s.Config.Recommend.CacheSize = 3 -// s.Config.Master.NumJobs = 4 -// s.Config.Recommend.UserNeighbors.EnableIndex = true -// s.Config.Recommend.UserNeighbors.IndexRecall = 1 -// s.Config.Recommend.UserNeighbors.IndexFitEpoch = 10 -// // collect similar -// users := []data.User{ -// {UserId: "0", Labels: []string{"a", "b", "c", "d"}, Subscribe: nil, Comment: ""}, -// {UserId: "1", Labels: []string{}, Subscribe: nil, Comment: ""}, -// {UserId: "2", Labels: []string{"b", "c", "d"}, Subscribe: nil, Comment: ""}, -// {UserId: "3", Labels: []string{}, Subscribe: nil, Comment: ""}, -// {UserId: "4", Labels: []string{"b", "c"}, Subscribe: nil, Comment: ""}, -// {UserId: "5", Labels: []string{}, Subscribe: nil, Comment: ""}, -// {UserId: "6", Labels: []string{"c"}, Subscribe: nil, Comment: ""}, -// {UserId: "7", Labels: []string{}, Subscribe: nil, Comment: ""}, -// {UserId: "8", Labels: []string{"a", "b", "c", "d", "e"}, Subscribe: nil, Comment: ""}, -// {UserId: "9", Labels: []string{}, Subscribe: nil, Comment: ""}, -// } -// feedbacks := make([]data.Feedback, 0) -// for i := 0; i < 10; i++ { -// for j := 0; j <= i; j++ { -// if i%2 == 1 { -// feedbacks = append(feedbacks, data.Feedback{ -// FeedbackKey: data.FeedbackKey{ -// ItemId: strconv.Itoa(j), -// UserId: strconv.Itoa(i), -// FeedbackType: "FeedbackType", -// }, -// Timestamp: time.Now(), -// }) -// } -// } -// } -// var err error -// err = s.DataClient.BatchInsertUsers(ctx, users) -// s.NoError(err) -// err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true) -// s.NoError(err) -// dataset, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, -// nil, 0, 0, NewOnlineEvaluator(), nil) -// s.NoError(err) -// s.rankingTrainSet = dataset -// -// // similar items (common users) -// s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated -// neighborTask := NewFindUserNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err := s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) -// -// // similar items (common labels) -// err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "8"), time.Now())) -// s.NoError(err) -// s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar -// neighborTask = NewFindUserNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) -// -// // similar items (auto) -// err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "8"), time.Now())) -// s.NoError(err) -// err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "9"), time.Now())) -// s.NoError(err) -// s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeAuto -// neighborTask = NewFindUserNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) -// similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) -//} -// -//func (s *MasterTestSuite) TestFindUserNeighborsIVF_ZeroIDF() { -// ctx := context.Background() -// // create config -// s.Config = &config.Config{} -// s.Config.Recommend.CacheSize = 3 -// s.Config.Master.NumJobs = 4 -// s.Config.Recommend.UserNeighbors.EnableIndex = true -// s.Config.Recommend.UserNeighbors.IndexRecall = 1 -// s.Config.Recommend.UserNeighbors.IndexFitEpoch = 10 -// -// // create dataset -// err := s.DataClient.BatchInsertUsers(ctx, []data.User{ -// {UserId: "0", Labels: []string{"a", "a"}, Subscribe: nil, Comment: ""}, -// {UserId: "1", Labels: []string{"a", "a"}, Subscribe: nil, Comment: ""}, -// }) -// s.NoError(err) -// err = s.DataClient.BatchInsertFeedback(ctx, []data.Feedback{ -// {FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "0", ItemId: "0"}}, -// {FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "1", ItemId: "0"}}, -// }, true, true, true) -// s.NoError(err) -// dataset, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, -// nil, 0, 0, NewOnlineEvaluator(), nil) -// s.NoError(err) -// s.rankingTrainSet = dataset -// -// // similar users (common items) -// s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated -// neighborTask := NewFindUserNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err := s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "0", []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"1"}, cache.ConvertDocumentsToValues(similar)) -// -// // similar users (common labels) -// s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar -// neighborTask = NewFindUserNeighborsTask(&s.Master) -// s.NoError(neighborTask.run(context.Background(), nil)) -// similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "0", []string{""}, 0, 100) -// s.NoError(err) -// s.Equal([]string{"1"}, cache.ConvertDocumentsToValues(similar)) -//} - func (s *MasterTestSuite) TestLoadDataFromDatabase() { ctx := context.Background() // create config @@ -701,12 +420,13 @@ func (s *MasterTestSuite) TestNonPersonalizedRecommend() { })) } -func (s *MasterTestSuite) TestCheckItemNeighborCacheTimeout() { +func (s *MasterTestSuite) TestNeedUpdateItemToItem() { s.Config = config.GetDefaultConfig() + recommendConfig := config.ItemToItemConfig{Name: cache.Neighbors} ctx := context.Background() // empty cache - s.True(s.checkItemNeighborCacheTimeout("1", nil)) + s.True(s.needUpdateItemToItem("1", recommendConfig)) err := s.CacheClient.AddScores(ctx, cache.ItemToItem, cache.Key(cache.Neighbors, "1"), []cache.Score{ {Id: "2", Score: 1, Categories: []string{""}}, {Id: "3", Score: 2, Categories: []string{""}}, @@ -715,33 +435,30 @@ func (s *MasterTestSuite) TestCheckItemNeighborCacheTimeout() { s.NoError(err) // digest mismatch - err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.ItemToItemDigest, cache.Key(cache.Neighbors, "1")), "digest")) + err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.ItemToItemDigest, cache.Neighbors, "1"), "digest")) s.NoError(err) - s.True(s.checkItemNeighborCacheTimeout("1", nil)) + s.True(s.needUpdateItemToItem("1", recommendConfig)) // staled cache - err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.ItemToItemDigest, cache.Key(cache.Neighbors, "1")), s.Config.ItemNeighborDigest())) + err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.ItemToItemDigest, cache.Neighbors, "1"), recommendConfig.Hash())) s.NoError(err) - s.True(s.checkItemNeighborCacheTimeout("1", nil)) - err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "1"), time.Now().Add(-time.Minute))) + s.True(s.needUpdateItemToItem("1", recommendConfig)) + err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.ItemToItemUpdateTime, cache.Neighbors, "1"), time.Now().Add(-s.Config.Recommend.CacheExpire))) s.NoError(err) - s.True(s.checkItemNeighborCacheTimeout("1", nil)) - err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.ItemToItemUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now().Add(-time.Hour))) - s.NoError(err) - s.True(s.checkItemNeighborCacheTimeout("1", nil)) + s.True(s.needUpdateItemToItem("1", recommendConfig)) // not staled cache - err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.ItemToItemUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now())) + err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.ItemToItemUpdateTime, cache.Neighbors, "1"), time.Now())) s.NoError(err) - s.False(s.checkItemNeighborCacheTimeout("1", nil)) + s.False(s.needUpdateItemToItem("1", recommendConfig)) } -func (s *MasterTestSuite) TestCheckUserNeighborCacheTimeout() { +func (s *MasterTestSuite) TestNeedUpdateUserToUser() { ctx := context.Background() s.Config = config.GetDefaultConfig() // empty cache - s.True(s.checkUserNeighborCacheTimeout("1")) + s.True(s.needUpdateUserToUser("1")) err := s.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "1"), []cache.Score{ {Id: "1", Score: 1, Categories: []string{""}}, {Id: "2", Score: 2, Categories: []string{""}}, @@ -750,23 +467,20 @@ func (s *MasterTestSuite) TestCheckUserNeighborCacheTimeout() { s.NoError(err) // digest mismatch - err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, "1")), "digest")) + err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Neighbors, "1"), "digest")) s.NoError(err) - s.True(s.checkUserNeighborCacheTimeout("1")) + s.True(s.needUpdateUserToUser("1")) // staled cache - err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, "1")), s.Config.UserNeighborDigest())) - s.NoError(err) - s.True(s.checkUserNeighborCacheTimeout("1")) - err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "1"), time.Now().Add(-time.Minute))) + err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Neighbors, "1"), s.Config.UserNeighborDigest())) s.NoError(err) - s.True(s.checkUserNeighborCacheTimeout("1")) - err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now().Add(-time.Hour))) + s.True(s.needUpdateUserToUser("1")) + err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Neighbors, "1"), time.Now().Add(-s.Config.Recommend.CacheExpire))) s.NoError(err) - s.True(s.checkUserNeighborCacheTimeout("1")) + s.True(s.needUpdateUserToUser("1")) // not staled cache - err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now())) + err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Neighbors, "1"), time.Now())) s.NoError(err) - s.False(s.checkUserNeighborCacheTimeout("1")) + s.False(s.needUpdateUserToUser("1")) } diff --git a/master/vectors.go b/master/vectors.go deleted file mode 100644 index b100521d8..000000000 --- a/master/vectors.go +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2022 gorse Project Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package master - -import ( - "fmt" - "github.com/bits-and-blooms/bitset" - "github.com/chewxy/math32" - "github.com/samber/lo" - "github.com/zhenghaoz/gorse/base/search" - "reflect" -) - -type VectorsInterface interface { - Distance(i, j int) float32 - Neighbors(i int) []int32 -} - -type Vectors struct { - connections [][]int32 - connected [][]int32 - weights []float32 -} - -func NewVectors(connections, connected [][]int32, weights []float32) *Vectors { - if len(connected) != len(weights) { - panic("the length of connected and weights doesn't match") - } - return &Vectors{ - connections: connections, - connected: connected, - weights: weights, - } -} - -func (v *Vectors) Distance(i, j int) float32 { - commonSum, commonCount := commonElements(v.connections[i], v.connections[j], v.weights) - if commonCount > 0 { - return commonSum * commonCount / - math32.Sqrt(weightedSum(v.connections[i], v.weights)) / - math32.Sqrt(weightedSum(v.connections[j], v.weights)) / - (commonCount + similarityShrink) - } else { - return 0 - } -} - -func (v *Vectors) Neighbors(i int) []int32 { - connections := v.connections[i] - bitSet := bitset.New(uint(len(connections))) - var adjacent []int32 - for _, p := range connections { - for _, neighbor := range v.connected[p] { - if !bitSet.Test(uint(neighbor)) { - bitSet.Set(uint(neighbor)) - adjacent = append(adjacent, neighbor) - } - } - } - return adjacent -} - -type DualVectors struct { - first *Vectors - second *Vectors -} - -func NewDualVectors(first, second *Vectors) *DualVectors { - if len(first.connections) != len(second.connections) { - panic("the number of connections mismatch") - } - return &DualVectors{ - first: first, - second: second, - } -} - -func (v *DualVectors) Distance(i, j int) float32 { - return (v.first.Distance(i, j) + v.second.Distance(i, j)) / 2 -} - -func (v *DualVectors) Neighbors(i int) []int32 { - connections := v.first.connections[i] - bitSet := bitset.New(uint(len(connections))) - var adjacent []int32 - // iterate the first group - for _, p := range connections { - for _, neighbor := range v.first.connected[p] { - if !bitSet.Test(uint(neighbor)) { - bitSet.Set(uint(neighbor)) - adjacent = append(adjacent, neighbor) - } - } - } - // iterate the second group - for _, p := range v.second.connections[i] { - for _, neighbor := range v.second.connected[p] { - if !bitSet.Test(uint(neighbor)) { - bitSet.Set(uint(neighbor)) - adjacent = append(adjacent, neighbor) - } - } - } - return adjacent -} - -type DualDictionaryVector struct { - first *search.DictionaryVector - second *search.DictionaryVector -} - -func NewDualDictionaryVector( - indices1 []int32, values1 []float32, - indices2 []int32, values2 []float32, - terms []string, isHidden bool) *DualDictionaryVector { - return &DualDictionaryVector{ - first: search.NewDictionaryVector(indices1, values1, terms, isHidden), - second: search.NewDictionaryVector(indices2, values2, terms, isHidden), - } -} - -func (v *DualDictionaryVector) Distance(vector search.Vector) float32 { - switch typedVector := vector.(type) { - case *DualDictionaryVector: - return (v.first.Distance(typedVector.first) + v.second.Distance(typedVector.second)) / 2 - default: - panic(fmt.Sprintf("unexpected vector type: %v", reflect.TypeOf(vector))) - } -} - -func (v *DualDictionaryVector) Terms() []string { - return v.first.Terms() -} - -func (v *DualDictionaryVector) IsHidden() bool { - return v.first.IsHidden() -} - -func (v *DualDictionaryVector) Centroid(vectors []search.Vector, indices []int32) search.CentroidVector { - return &DualDictionaryCentroidVector{ - first: search.DictionaryVector{}.Centroid(lo.Map(vectors, func(v search.Vector, _ int) search.Vector { - switch typedVector := v.(type) { - case *DualDictionaryVector: - return typedVector.first - default: - panic(fmt.Sprintf("unexpected vector type: %v", reflect.TypeOf(v))) - } - }), indices), - second: search.DictionaryVector{}.Centroid(lo.Map(vectors, func(v search.Vector, _ int) search.Vector { - switch typedVector := v.(type) { - case *DualDictionaryVector: - return typedVector.second - default: - panic(fmt.Sprintf("unexpected vector type: %v", reflect.TypeOf(v))) - } - }), indices), - } -} - -type DualDictionaryCentroidVector struct { - first search.CentroidVector - second search.CentroidVector -} - -func (d *DualDictionaryCentroidVector) Distance(vector search.Vector) float32 { - switch typedVector := vector.(type) { - case *DualDictionaryVector: - return (d.first.Distance(typedVector.first) + d.second.Distance(typedVector.second)) / 2 - default: - panic(fmt.Sprintf("unexpected vector type: %v", reflect.TypeOf(vector))) - } -} diff --git a/server/rest.go b/server/rest.go index f262f5693..83a631ea5 100644 --- a/server/rest.go +++ b/server/rest.go @@ -709,7 +709,8 @@ func (s *RestServer) getNonPersonalized(request *restful.Request, response *rest func (s *RestServer) getItemToItem(request *restful.Request, response *restful.Response) { name := request.PathParameter("name") itemId := request.PathParameter("item-id") - s.SearchDocuments(cache.ItemToItem, cache.Key(name, itemId), nil, nil, request, response) + categories := request.QueryParameters("category") + s.SearchDocuments(cache.ItemToItem, cache.Key(name, itemId), categories, nil, request, response) } // get feedback by item-id with feedback type diff --git a/server/rest_test.go b/server/rest_test.go index 68df3eb3e..1e9adbd1a 100644 --- a/server/rest_test.go +++ b/server/rest_test.go @@ -829,6 +829,7 @@ func (suite *ServerTestSuite) TestNonPersonalizedRecommend() { {"NonPersonalized", cache.NonPersonalized, "trending", "", "/api/non-personalized/trending"}, {"NonPersonalizedCategory", cache.NonPersonalized, "trending", "0", "/api/non-personalized/trending"}, {"ItemToItem", cache.ItemToItem, cache.Key("lookalike", "0"), "", "/api/item-to-item/lookalike/0"}, + {"ItemToItemCategory", cache.ItemToItem, cache.Key("lookalike", "0"), "0", "/api/item-to-item/lookalike/0"}, {"Offline Recommend", cache.OfflineRecommend, "0", "", "/api/intermediate/recommend/0"}, {"Offline Recommend in Category", cache.OfflineRecommend, "0", "0", "/api/intermediate/recommend/0/0"}, }