Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Sequential vector search #240

Merged
merged 67 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
2804257
implement vector storage & search in runtime
jairad26 May 19, 2024
50d7d93
Merge branch 'main' into vector-search
jairad26 May 24, 2024
acb6cef
make factory generic to control all indices
jairad26 May 28, 2024
fb2e775
remove unused variables
jairad26 May 28, 2024
d764bd8
lint
jairad26 May 28, 2024
6c25a59
Merge branch 'main' into vector-search
jairad26 May 29, 2024
2e3fc55
Merge branch 'main' into vector-search
jairad26 Jun 3, 2024
d785a8d
improve brute force search using heap
jairad26 Jun 3, 2024
f71ba58
better uid calculation
jairad26 Jun 3, 2024
121ae99
add reading and writing from wal in storage
jairad26 Jun 3, 2024
6a1c18e
add exponential backoff on read write from wal
jairad26 Jun 3, 2024
8d70fda
remove comments
jairad26 Jun 3, 2024
eae9020
redesign for new host fns
jairad26 Jun 10, 2024
b92aeaf
Merge branch 'main' into vector-search
jairad26 Jun 10, 2024
f1e2f0e
add index actions: create & remove
jairad26 Jun 10, 2024
91b8f60
final reimplementation
jairad26 Jun 13, 2024
4377cc1
Merge branch 'main' into vector-search
jairad26 Jun 13, 2024
0b648b4
Merge branch 'main' into vector-search
jairad26 Jun 13, 2024
7ba7988
use helper functions:
jairad26 Jun 13, 2024
066c0f6
add support for auto indexing
jairad26 Jun 13, 2024
2aa1bbc
lint
jairad26 Jun 13, 2024
0c790a2
rename some errors, better output to user on failure
jairad26 Jun 14, 2024
32ea212
remove options and constraints until necessary
jairad26 Jun 14, 2024
610fb94
refactoring & graceful shutdown and restart
jairad26 Jun 14, 2024
352ea2f
more robust reloading mechanism
jairad26 Jun 14, 2024
802a11e
remove some comments
jairad26 Jun 14, 2024
21be83d
remove more extraneous code
jairad26 Jun 14, 2024
dbbd987
hardcoded strings to variables
jairad26 Jun 14, 2024
b31c02b
Merge branch 'main' into vector-search
jairad26 Jun 14, 2024
39f3a8f
changelog & use new readparams nullable fix
jairad26 Jun 14, 2024
e0a5695
add comments about future expansion
jairad26 Jun 14, 2024
25c87bb
rename vector to collections
jairad26 Jun 17, 2024
e1ae84d
more renaming, move recompute search method to own type
jairad26 Jun 17, 2024
c5f9eda
move to f32, add better casting and error handling
jairad26 Jun 17, 2024
ad2749c
change scoring to f64
jairad26 Jun 17, 2024
45c2a43
add getTexts for collection
jairad26 Jun 18, 2024
b2be8a8
fix some panics, add getTexts
jairad26 Jun 18, 2024
612b4e7
move indexing from manifest to background process
jairad26 Jun 18, 2024
2047142
changed log
jairad26 Jun 18, 2024
b32f0bb
Merge branch 'main' into vector-search
jairad26 Jun 19, 2024
98faec6
Merge branch 'main' into vector-search
jairad26 Jun 19, 2024
3cef2e4
revisions
jairad26 Jun 19, 2024
afce681
add callback on manifest
jairad26 Jun 19, 2024
108cd50
fix callback
jairad26 Jun 19, 2024
2cef4c8
use global variable
jairad26 Jun 19, 2024
b26808d
move plugins and module functions to modules package
jairad26 Jun 19, 2024
b208be3
Merge branch 'fix-wasmhost-package' into vector-search
jairad26 Jun 19, 2024
4c284ee
change name to callFunctionWithCallInfo
jairad26 Jun 19, 2024
5f2fa8d
Merge branch 'fix-wasmhost-package' into vector-search
jairad26 Jun 19, 2024
171586e
move worker to inside of channel for loop
jairad26 Jun 19, 2024
0bdddb4
lint
jairad26 Jun 19, 2024
f364ed4
Merge branch 'fix-wasmhost-package' into vector-search
jairad26 Jun 20, 2024
1375712
add redis
jairad26 Jun 20, 2024
448afae
fix embedder req
jairad26 Jun 20, 2024
2a4c953
Merge branch 'vector-search' into vector-search-redis
jairad26 Jun 20, 2024
b994e03
vector index with redis
jairad26 Jun 20, 2024
daccd3c
postgres support
jairad26 Jun 21, 2024
79db236
Merge branch 'main' into vector-search-postgres
jairad26 Jun 21, 2024
f486345
revert some unnecessary changes
jairad26 Jun 21, 2024
e85bf8a
lint
jairad26 Jun 21, 2024
37d15f1
fix collections.go
jairad26 Jun 21, 2024
3d42676
add support for removing indexes from db
jairad26 Jun 21, 2024
7259ec9
Merge branch 'main' into vector-search-postgres
jairad26 Jun 21, 2024
c53f4a8
bugs
jairad26 Jun 21, 2024
8a7aa9a
revisions
jairad26 Jun 21, 2024
9f0b697
fix panic on collectionName not existing
jairad26 Jun 21, 2024
4091bc4
revs
jairad26 Jun 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## UNRELEASED

- Add sequential vector search [#202](https://github.com/hypermodeAI/runtime/pull/202)
- Add nullable check in ReadString [#228](https://github.com/hypermodeAI/runtime/pull/228)
- Lowercase model name before invoking for hypermode hosted models [#221](https://github.com/gohypermode/runtime/pull/221)
- Improve HTTP error messages [#222](https://github.com/gohypermode/runtime/pull/222)
Expand Down
222 changes: 222 additions & 0 deletions collections/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package collections

import (
"context"
"errors"
"fmt"

"hmruntime/collections/index/interfaces"
"hmruntime/db"
"hmruntime/logger"
"hmruntime/manifestdata"
"hmruntime/pluginmanager"
"hmruntime/plugins"

"sync"
"time"
)

const collectionFactoryWriteInterval = 1

var (
GlobalCollectionFactory *CollectionFactory
ErrCollectionNotFound = fmt.Errorf("collection not found")
)

func InitializeIndexFactory(ctx context.Context) {
GlobalCollectionFactory = CreateFactory()
manifestdata.RegisterManifestLoadedCallback(CleanAndProcessManifest)
pluginmanager.RegisterPluginLoadedCallback(func(ctx context.Context, metadata plugins.PluginMetadata) error {
CatchEmbedderReqs(ctx)
return nil
})
go GlobalCollectionFactory.worker(ctx)
}

func CloseIndexFactory(ctx context.Context) {
close(GlobalCollectionFactory.quit)
<-GlobalCollectionFactory.done
}

type CollectionFactory struct {
collectionMap map[string]interfaces.Collection
mu sync.RWMutex
quit chan struct{}
done chan struct{}
}

func (tif *CollectionFactory) worker(ctx context.Context) {
defer close(tif.done)
ticker := time.NewTicker(collectionFactoryWriteInterval * time.Minute)

defer ticker.Stop()
for {
select {
case <-ticker.C:
// read from postgres all collections & searchMethod after lastInsertedID
tif.ReadFromPostgres(ctx)
case <-tif.quit:
return
}
}
}

func CreateFactory() *CollectionFactory {
f := &CollectionFactory{
collectionMap: map[string]interfaces.Collection{},
quit: make(chan struct{}),
done: make(chan struct{}),
}
return f
}

func (hf *CollectionFactory) isNameAvailableWithLock(name string) bool {
_, nameUsed := hf.collectionMap[name]
return !nameUsed
}

func (hf *CollectionFactory) Create(
ctx context.Context,
name string,
index interfaces.Collection) (interfaces.Collection, error) {
hf.mu.Lock()
defer hf.mu.Unlock()
return hf.createWithLock(name, index)
}

func (hf *CollectionFactory) createWithLock(
name string,
index interfaces.Collection) (interfaces.Collection, error) {
if !hf.isNameAvailableWithLock(name) {
err := errors.New("index with name " + name + " already exists")
return nil, err
}
retVal := index
hf.collectionMap[name] = retVal
return retVal, nil
}

func (hf *CollectionFactory) GetCollectionMap() map[string]interfaces.Collection {
return hf.collectionMap
}

func (hf *CollectionFactory) Find(ctx context.Context, name string) (interfaces.Collection, error) {
hf.mu.RLock()
defer hf.mu.RUnlock()
return hf.findWithLock(name)
}

func (hf *CollectionFactory) findWithLock(name string) (interfaces.Collection, error) {
vecInd, ok := hf.collectionMap[name]
if !ok {
return nil, ErrCollectionNotFound
}
return vecInd, nil
}

func (hf *CollectionFactory) Remove(ctx context.Context, name string) error {
hf.mu.Lock()
defer hf.mu.Unlock()
err := db.DeleteCollectionTexts(ctx, name)
if err != nil {
return err
}
return hf.removeWithLock(name)
}

func (hf *CollectionFactory) removeWithLock(name string) error {
delete(hf.collectionMap, name)
return nil
}

func (hf *CollectionFactory) CreateOrReplace(
ctx context.Context,
name string,
index interfaces.Collection) (interfaces.Collection, error) {
hf.mu.Lock()
defer hf.mu.Unlock()
vi, err := hf.findWithLock(name)
if err != nil {
return nil, err
}
if vi != nil {
err = hf.removeWithLock(name)
if err != nil {
return nil, err
}
}
return hf.createWithLock(name, index)
}

func (hf *CollectionFactory) ReadFromPostgres(ctx context.Context) {
for _, collection := range hf.collectionMap {
err := LoadTextsIntoCollection(ctx, collection)
if err != nil {
logger.Err(ctx, err).
Str("collection_name", collection.GetCollectionName()).
Msg("Failed to load texts into collection.")
}

for _, vectorIndex := range collection.GetVectorIndexMap() {
err = LoadVectorsIntoVectorIndex(ctx, vectorIndex, collection)
if err != nil {
logger.Err(ctx, err).
Str("collection_name", collection.GetCollectionName()).
Str("search_method", vectorIndex.GetSearchMethodName()).
Msg("Failed to load vectors into vector index.")
}
}
}
}

func LoadTextsIntoCollection(ctx context.Context, collection interfaces.Collection) error {
// Get checkpoint id for collection
textCheckpointId, err := collection.GetCheckpointId(ctx)
if err != nil {
return err
}

// Query all texts from checkpoint
textIds, keys, texts, err := db.QueryCollectionTextsFromCheckpoint(ctx, collection.GetCollectionName(), textCheckpointId)
if err != nil {
return err
}
if len(textIds) != len(keys) || len(keys) != len(texts) {
return errors.New("mismatch in keys and texts")
}

// Insert all texts into collection
for i := range textIds {
err = collection.InsertTextToMemory(ctx, textIds[i], keys[i], texts[i])
if err != nil {
return err
}
}
return nil
}

func LoadVectorsIntoVectorIndex(ctx context.Context, vectorIndex *interfaces.VectorIndexWrapper, collection interfaces.Collection) error {
// Get checkpoint id for vector index
vecCheckpointId, err := vectorIndex.GetCheckpointId(ctx)
if err != nil {
return err
}

// Query all vectors from checkpoint
vectorIds, keys, vectors, err := db.QueryCollectionVectorsFromCheckpoint(ctx, collection.GetCollectionName(), vectorIndex.GetSearchMethodName(), vecCheckpointId)
if err != nil {
return err
}
if len(vectorIds) != len(vectors) || len(keys) != len(vectors) {
return errors.New("mismatch in keys and vectors")
}

// Insert all vectors into vector index
for i := range vectorIds {
err = vectorIndex.InsertVectorToMemory(ctx, vectorIds[i], keys[i], vectors[i])
if err != nil {
return err
}
}
return nil
}
124 changes: 124 additions & 0 deletions collections/in_mem/sequential/vector_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package sequential

import (
"container/heap"
"context"
"hmruntime/collections/index"
"hmruntime/collections/utils"
"hmruntime/db"
"sync"
)

const (
SequentialVectorIndexType = "SequentialVectorIndex"
)

type SequentialVectorIndex struct {
mu sync.RWMutex
searchMethodName string
lastInsertedID int64
VectorMap map[string][]float32 // key: vector
}

func NewSequentialVectorIndex(collection, searchMethod string) *SequentialVectorIndex {
return &SequentialVectorIndex{
searchMethodName: searchMethod,
VectorMap: make(map[string][]float32),
}
}

func (ims *SequentialVectorIndex) GetSearchMethodName() string {
return ims.searchMethodName
}

func (ims *SequentialVectorIndex) GetVectorNodesMap() map[string][]float32 {
ims.mu.RLock()
defer ims.mu.RUnlock()
return ims.VectorMap
}

func (ims *SequentialVectorIndex) Search(ctx context.Context, query []float32, maxResults int, filter index.SearchFilter) (utils.MinTupleHeap, error) {
// calculate cosine similarity and return top maxResults results
ims.mu.RLock()
defer ims.mu.RUnlock()
var results utils.MinTupleHeap
heap.Init(&results)
for key, vector := range ims.VectorMap {
if filter != nil && !filter(query, vector, key) {
continue
}
similarity, err := utils.CosineSimilarity(query, vector)
if err != nil {
return nil, err
}
if results.Len() < maxResults {
heap.Push(&results, utils.InitHeapElement(similarity, key, false))
} else if utils.IsBetterScoreForSimilarity(similarity, results[0].GetValue()) {
heap.Pop(&results)
heap.Push(&results, utils.InitHeapElement(similarity, key, false))
}
}

// Return top maxResults results
var finalResults utils.MinTupleHeap
for results.Len() > 0 {
finalResults = append(finalResults, heap.Pop(&results).(utils.MinHeapElement))
}
// Reverse the finalResults to get the highest similarity first
for i, j := 0, len(finalResults)-1; i < j; i, j = i+1, j-1 {
finalResults[i], finalResults[j] = finalResults[j], finalResults[i]
}
return finalResults, nil
}

func (ims *SequentialVectorIndex) SearchWithKey(ctx context.Context, queryKey string, maxResults int, filter index.SearchFilter) (utils.MinTupleHeap, error) {
ims.mu.RLock()
query := ims.VectorMap[queryKey]
ims.mu.RUnlock()
if query == nil {
return nil, nil
}
return ims.Search(ctx, query, maxResults, filter)
}

func (ims *SequentialVectorIndex) InsertVector(ctx context.Context, textId int64, vec []float32) error {

vectorId, key, err := db.WriteCollectionVector(ctx, ims.searchMethodName, textId, vec)
if err != nil {
return err
}

return ims.InsertVectorToMemory(ctx, vectorId, key, vec)

}

func (ims *SequentialVectorIndex) InsertVectorToMemory(ctx context.Context, vectorId int64, key string, vec []float32) error {
ims.mu.Lock()
defer ims.mu.Unlock()
ims.VectorMap[key] = vec
ims.lastInsertedID = vectorId
return nil
}

func (ims *SequentialVectorIndex) DeleteVector(ctx context.Context, textId int64, key string) error {
ims.mu.Lock()
defer ims.mu.Unlock()
err := db.DeleteCollectionVector(ctx, ims.searchMethodName, textId)
if err != nil {
return err
}
delete(ims.VectorMap, key)
return nil
}

func (ims *SequentialVectorIndex) GetVector(ctx context.Context, key string) ([]float32, error) {
ims.mu.RLock()
defer ims.mu.RUnlock()
return ims.VectorMap[key], nil
}

func (ims *SequentialVectorIndex) GetCheckpointId(ctx context.Context) (int64, error) {
ims.mu.RLock()
defer ims.mu.RUnlock()
return ims.lastInsertedID, nil
}
Loading