diff --git a/go/libraries/doltcore/remotestorage/chunk_cache.go b/go/libraries/doltcore/remotestorage/chunk_cache.go index 6345981b3e..f5175b69eb 100644 --- a/go/libraries/doltcore/remotestorage/chunk_cache.go +++ b/go/libraries/doltcore/remotestorage/chunk_cache.go @@ -19,22 +19,21 @@ import ( "github.com/dolthub/dolt/go/store/nbs" ) -// ChunkCache is an interface used for caching chunks +// ChunkCache is an interface used for caching chunks and has presence that +// has already been fetched from remotestorage. Care should be taken when +// using ChunkCache if it is possible for the remote to GC, since in that +// case the cache could contain stale data. type ChunkCache interface { - // Put puts a slice of chunks into the cache. Error returned if the cache capacity has been exceeded. - Put(c []nbs.ToChunker) error + // Insert some observed / fetched chunks into the cache. These + // chunks may or may not be returned in the future. + InsertChunks(cs []nbs.ToChunker) + // Get previously cached chunks, if they are still available. + GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.ToChunker - // Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty. - // is put in it's place - Get(h hash.HashSet) map[hash.Hash]nbs.ToChunker - - // Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it. - Has(h hash.HashSet) (absent hash.HashSet) - - // PutChunk puts a single chunk in the cache. Returns an error if the cache capacity has been exceeded. - PutChunk(chunk nbs.ToChunker) error - - // GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache - // between the last time GetAndClearChunksToFlush was called and now. - GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker + // Insert all hashes in |h| as existing in the remote. + InsertHas(h hash.HashSet) + // Returns the absent set from |h|, filtering it by records + // which are known to be present in the remote based on + // previous |InsertHas| calls. + GetCachedHas(h hash.HashSet) (absent hash.HashSet) } diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index b2f8a0ee17..4fed21ffb4 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -46,7 +46,7 @@ import ( "github.com/dolthub/dolt/go/store/types" ) -var ErrCacheCapacityExceeded = errors.New("too much data: the cache capacity has been reached") +var ErrWriteBufferCapacityExceeded = errors.New("too much data: the write buffer capacity has been reached") var ErrUploadFailed = errors.New("upload failed") @@ -123,6 +123,7 @@ type DoltChunkStore struct { root hash.Hash csClient remotesapi.ChunkStoreServiceClient finalizer func() error + wb WriteBuffer cache ChunkCache metadata *remotesapi.GetRepoMetadataResponse nbf *types.NomsBinFormat @@ -172,6 +173,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa csClient: csClient, finalizer: func() error { return nil }, cache: newMapChunkCache(), + wb: newMapWriteBuffer(), metadata: metadata, nbf: nbf, httpFetcher: globalHttpFetcher, @@ -185,79 +187,46 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa return cs, nil } +func (dcs *DoltChunkStore) clone() *DoltChunkStore { + ret := *dcs + ret.repoToken = new(atomic.Value) + return &ret +} + func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore { - return &DoltChunkStore{ - repoId: dcs.repoId, - repoPath: dcs.repoPath, - repoToken: new(atomic.Value), - host: dcs.host, - root: dcs.root, - csClient: dcs.csClient, - finalizer: dcs.finalizer, - cache: dcs.cache, - metadata: dcs.metadata, - nbf: dcs.nbf, - httpFetcher: fetcher, - params: dcs.params, - stats: dcs.stats, - } + ret := dcs.clone() + ret.httpFetcher = fetcher + return ret +} + +func (dcs *DoltChunkStore) WithNoopWriteBuffer() *DoltChunkStore { + ret := dcs.clone() + ret.wb = noopWriteBuffer{} + return ret +} + +func (dcs *DoltChunkStore) WithWriteBuffer(wb WriteBuffer) *DoltChunkStore { + ret := dcs.clone() + ret.wb = wb + return ret } func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore { - return &DoltChunkStore{ - repoId: dcs.repoId, - repoPath: dcs.repoPath, - repoToken: new(atomic.Value), - host: dcs.host, - root: dcs.root, - csClient: dcs.csClient, - finalizer: dcs.finalizer, - cache: noopChunkCache, - metadata: dcs.metadata, - nbf: dcs.nbf, - httpFetcher: dcs.httpFetcher, - params: dcs.params, - stats: dcs.stats, - logger: dcs.logger, - } + ret := dcs.clone() + ret.cache = noopChunkCache + return ret } func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore { - return &DoltChunkStore{ - repoId: dcs.repoId, - repoPath: dcs.repoPath, - repoToken: new(atomic.Value), - host: dcs.host, - root: dcs.root, - csClient: dcs.csClient, - finalizer: dcs.finalizer, - cache: cache, - metadata: dcs.metadata, - nbf: dcs.nbf, - httpFetcher: dcs.httpFetcher, - params: dcs.params, - stats: dcs.stats, - logger: dcs.logger, - } + ret := dcs.clone() + ret.cache = cache + return ret } func (dcs *DoltChunkStore) WithNetworkRequestParams(params NetworkRequestParams) *DoltChunkStore { - return &DoltChunkStore{ - repoId: dcs.repoId, - repoPath: dcs.repoPath, - repoToken: new(atomic.Value), - host: dcs.host, - root: dcs.root, - csClient: dcs.csClient, - finalizer: dcs.finalizer, - cache: dcs.cache, - metadata: dcs.metadata, - nbf: dcs.nbf, - httpFetcher: dcs.httpFetcher, - params: params, - stats: dcs.stats, - logger: dcs.logger, - } + ret := dcs.clone() + ret.params = params + return ret } func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger) { @@ -344,19 +313,18 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha ctx, span := tracer.Start(ctx, "remotestorage.GetManyCompressed") defer span.End() - hashToChunk := dcs.cache.Get(hashes) + hashToChunk := dcs.cache.GetCachedChunks(hashes) + dcs.wb.AddBufferedChunks(hashes, hashToChunk) span.SetAttributes(attribute.Int("num_hashes", len(hashes)), attribute.Int("cache_hits", len(hashToChunk))) atomic.AddUint32(&dcs.stats.Hits, uint32(len(hashToChunk))) notCached := make([]hash.Hash, 0, len(hashes)) for h := range hashes { - c := hashToChunk[h] - - if c == nil || c.IsEmpty() { - notCached = append(notCached, h) - } else { + if c, ok := hashToChunk[h]; ok { found(ctx, c) + } else { + notCached = append(notCached, h) } } @@ -633,9 +601,7 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash } // Don't forward on empty/not found chunks. if !cc.IsEmpty() { - if err := dcs.cache.PutChunk(cc); err != nil { - return err - } + dcs.cache.InsertChunks([]nbs.ToChunker{cc}) found(egCtx, cc) } } @@ -663,8 +629,8 @@ const maxHasManyBatchSize = 16 * 1024 // absent from the store. func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { // get the set of hashes that isn't already in the cache - notCached := dcs.cache.Has(hashes) - + notCached := dcs.cache.GetCachedHas(hashes) + dcs.wb.RemovePresentChunks(notCached) if len(notCached) == 0 { return notCached, nil } @@ -673,7 +639,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha hashSl, byteSl := HashSetToSlices(notCached) absent := make(hash.HashSet) - var found []nbs.ToChunker + found := make(hash.HashSet) var err error batchItr(len(hashSl), maxHasManyBatchSize, func(st, end int) (stop bool) { @@ -714,8 +680,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha absent[currHash] = struct{}{} j++ } else { - c := nbs.ChunkToCompressedChunk(chunks.NewChunkWithHash(currHash, []byte{})) - found = append(found, c) + found.Insert(currHash) } } @@ -731,9 +696,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha } if len(found) > 0 { - if err := dcs.cache.Put(found); err != nil { - return hash.HashSet{}, err - } + dcs.cache.InsertHas(found) } return absent, nil @@ -767,7 +730,8 @@ func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chu } cc := nbs.ChunkToCompressedChunk(c) - if err := dcs.cache.Put([]nbs.ToChunker{cc}); err != nil { + err = dcs.wb.Put(cc) + if err != nil { return err } return nil @@ -850,8 +814,26 @@ func (dcs *DoltChunkStore) loadRoot(ctx context.Context) error { // persisted root hash from last to current (or keeps it the same). // If last doesn't match the root in persistent storage, returns false. func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) { - hashToChunkCount, err := dcs.uploadChunks(ctx) + toUpload := dcs.wb.GetAllForWrite() + var resp *remotesapi.CommitResponse + defer func() { + // We record success based on the CommitResponse + // |Success| field, which is only |true| when the call + // successfully updated the root hash of the + // remote. With the current API, we cannot distinguish + // the case where the commit failed because |last| was + // stale but the provided chunks were still + // successfully added to the remote. If the write is + // retried in such a case, we will currently write the + // chunks to the remote again. + if resp != nil { + dcs.wb.WriteCompleted(resp.Success) + } else { + dcs.wb.WriteCompleted(false) + } + }() + hashToChunkCount, err := dcs.uploadChunks(ctx, toUpload) if err != nil { return false, err } @@ -873,7 +855,7 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) NbsVersion: nbs.StorageVersion, }, } - resp, err := dcs.csClient.Commit(ctx, req) + resp, err = dcs.csClient.Commit(ctx, req) if err != nil { return false, NewRpcError(err, "Commit", dcs.host, req) } @@ -911,10 +893,11 @@ func (dcs *DoltChunkStore) Close() error { return dcs.finalizer() } -// getting this working using the simplest approach first -func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, error) { - hashToChunk := dcs.cache.GetAndClearChunksToFlush() - +// Uploads all chunks in |hashToChunk| to the remote store and returns +// the manifest entries that correspond to the new table files. Used +// by |Commit|. Typically |hashToChunk| will have come from our |wb| +// |writeBuffer|. +func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[hash.Hash]nbs.CompressedChunk) (map[hash.Hash]int, error) { if len(hashToChunk) == 0 { return map[hash.Hash]int{}, nil } @@ -922,7 +905,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, chnks := make([]chunks.Chunk, 0, len(hashToChunk)) for _, chable := range hashToChunk { ch, err := chable.ToChunk() - if err != nil { return nil, err } @@ -951,7 +933,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, } for h, contentHash := range hashToContentHash { - // Can parallelize this in the future if needed err := dcs.uploadTableFileWithRetries(ctx, h, uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) { data := hashToData[h] return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil diff --git a/go/libraries/doltcore/remotestorage/map_chunk_cache.go b/go/libraries/doltcore/remotestorage/map_chunk_cache.go index 510ca54469..f8db4cf51a 100644 --- a/go/libraries/doltcore/remotestorage/map_chunk_cache.go +++ b/go/libraries/doltcore/remotestorage/map_chunk_cache.go @@ -15,134 +15,70 @@ package remotestorage import ( - "sync" + lru "github.com/hashicorp/golang-lru/v2" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nbs" ) -const ( - // averageChunkSize is used to estimate the size of chunk for purposes of avoiding excessive memory usage - averageChunkSize = 1 << 12 -) - -// mapChunkCache is a ChunkCache implementation that stores everything in an in memory map. +// mapChunkCache is a simple ChunkCache implementation that stores +// cached chunks and has records in two separate lru caches. type mapChunkCache struct { - mu *sync.Mutex - hashToChunk map[hash.Hash]nbs.ToChunker - toFlush map[hash.Hash]nbs.ToChunker - cm CapacityMonitor + chunks *lru.TwoQueueCache[hash.Hash, nbs.ToChunker] + has *lru.TwoQueueCache[hash.Hash, struct{}] } +const defaultCacheChunkCapacity = 32 * 1024 +const defaultCacheHasCapacity = 1024 * 1024 + func newMapChunkCache() *mapChunkCache { - return &mapChunkCache{ - &sync.Mutex{}, - make(map[hash.Hash]nbs.ToChunker), - make(map[hash.Hash]nbs.ToChunker), - NewUncappedCapacityMonitor(), - } + return NewMapChunkCacheWithCapacity(defaultCacheChunkCapacity, defaultCacheHasCapacity) } -// used by DoltHub API -func NewMapChunkCacheWithMaxCapacity(maxCapacity int64) *mapChunkCache { +func NewMapChunkCacheWithCapacity(maxChunkCapacity, maxHasCapacity int) *mapChunkCache { + chunks, err := lru.New2Q[hash.Hash, nbs.ToChunker](maxChunkCapacity) + if err != nil { + panic(err) + } + has, err := lru.New2Q[hash.Hash, struct{}](maxHasCapacity) + if err != nil { + panic(err) + } return &mapChunkCache{ - &sync.Mutex{}, - make(map[hash.Hash]nbs.ToChunker), - make(map[hash.Hash]nbs.ToChunker), - NewFixedCapacityMonitor(maxCapacity), + chunks, + has, } } -// Put puts a slice of chunks into the cache. Returns an error if the cache capacity has been exceeded. -func (mcc *mapChunkCache) Put(chnks []nbs.ToChunker) error { - mcc.mu.Lock() - defer mcc.mu.Unlock() - - for i := 0; i < len(chnks); i++ { - c := chnks[i] - h := c.Hash() - - if curr, ok := mcc.hashToChunk[h]; ok { - if !curr.IsEmpty() { - continue - } - } - - if mcc.cm.CapacityExceeded(averageChunkSize) { - return ErrCacheCapacityExceeded - } - - mcc.hashToChunk[h] = c - - if !c.IsEmpty() { - mcc.toFlush[h] = c - } +func (cache *mapChunkCache) InsertChunks(cs []nbs.ToChunker) { + for _, c := range cs { + cache.chunks.Add(c.Hash(), c) } - - return nil } -// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty. -// is put in it's place -func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]nbs.ToChunker { - hashToChunk := make(map[hash.Hash]nbs.ToChunker) - - mcc.mu.Lock() - defer mcc.mu.Unlock() - - for h := range hashes { - if c, ok := mcc.hashToChunk[h]; ok { - hashToChunk[h] = c - } else { - hashToChunk[h] = nbs.EmptyCompressedChunk +func (cache *mapChunkCache) GetCachedChunks(hs hash.HashSet) map[hash.Hash]nbs.ToChunker { + ret := make(map[hash.Hash]nbs.ToChunker) + for h := range hs { + c, ok := cache.chunks.Get(h) + if ok { + ret[h] = c } } - - return hashToChunk + return ret } -// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it. -func (mcc *mapChunkCache) Has(hashes hash.HashSet) (absent hash.HashSet) { - absent = make(hash.HashSet) - - mcc.mu.Lock() - defer mcc.mu.Unlock() - - for h := range hashes { - if _, ok := mcc.hashToChunk[h]; !ok { - absent[h] = struct{}{} - } +func (cache *mapChunkCache) InsertHas(hs hash.HashSet) { + for h := range hs { + cache.has.Add(h, struct{}{}) } - - return absent } -func (mcc *mapChunkCache) PutChunk(ch nbs.ToChunker) error { - mcc.mu.Lock() - defer mcc.mu.Unlock() - - h := ch.Hash() - if existing, ok := mcc.hashToChunk[h]; !ok || existing.IsEmpty() { - if mcc.cm.CapacityExceeded(averageChunkSize) { - return ErrCacheCapacityExceeded +func (cache *mapChunkCache) GetCachedHas(hs hash.HashSet) (absent hash.HashSet) { + ret := make(hash.HashSet) + for h := range hs { + if !cache.has.Contains(h) { + ret.Insert(h) } - mcc.hashToChunk[h] = ch - mcc.toFlush[h] = ch } - - return nil -} - -// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache -// between the last time GetAndClearChunksToFlush was called and now. -func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker { - newToFlush := make(map[hash.Hash]nbs.ToChunker) - - mcc.mu.Lock() - defer mcc.mu.Unlock() - - toFlush := mcc.toFlush - mcc.toFlush = newToFlush - - return toFlush + return ret } diff --git a/go/libraries/doltcore/remotestorage/map_chunk_cache_test.go b/go/libraries/doltcore/remotestorage/map_chunk_cache_test.go index 8cb5e118d1..62aba44ea9 100644 --- a/go/libraries/doltcore/remotestorage/map_chunk_cache_test.go +++ b/go/libraries/doltcore/remotestorage/map_chunk_cache_test.go @@ -15,10 +15,8 @@ package remotestorage import ( - "math/rand" - "reflect" + "math/rand/v2" "testing" - "time" "github.com/stretchr/testify/assert" @@ -27,68 +25,111 @@ import ( "github.com/dolthub/dolt/go/store/nbs" ) -func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []nbs.ToChunker) { - chks := make([]nbs.ToChunker, n) - hashes := make(hash.HashSet) - for i := 0; i < n; i++ { - size := int(rng.Int31n(99) + 1) - bytes := make([]byte, size) - for j := 0; j < size; j++ { - bytes[j] = byte(rng.Int31n(255)) - } - - chk := nbs.ChunkToCompressedChunk(chunks.NewChunk(bytes)) - chks[i] = chk - - hashes[chk.Hash()] = struct{}{} - } - - return hashes, chks -} - func TestMapChunkCache(t *testing.T) { - const chunkBatchSize = 10 - - seed := time.Now().UnixNano() - rng := rand.New(rand.NewSource(seed)) - hashes, chks := genRandomChunks(rng, chunkBatchSize) - - mapChunkCache := newMapChunkCache() - mapChunkCache.Put(chks) - hashToChunk := mapChunkCache.Get(hashes) - - assert.Equal(t, len(hashToChunk), chunkBatchSize, "Did not read back all chunks (seed %d)", seed) - - absent := mapChunkCache.Has(hashes) - - assert.Equal(t, len(absent), 0, "Missing chunks that were added (seed %d)", seed) - - toFlush := mapChunkCache.GetAndClearChunksToFlush() - - assert.True(t, reflect.DeepEqual(toFlush, hashToChunk), "unexpected or missing chunks to flush (seed %d)", seed) - - moreHashes, moreChks := genRandomChunks(rng, chunkBatchSize) - - joinedHashes := make(hash.HashSet) - - for h := range hashes { - joinedHashes[h] = struct{}{} - } - - for h := range moreHashes { - joinedHashes[h] = struct{}{} - } + t.Run("New", func(t *testing.T) { + assert.NotNil(t, newMapChunkCache()) + assert.NotNil(t, NewMapChunkCacheWithCapacity(32, 32)) + assert.Panics(t, func() { + assert.NotNil(t, NewMapChunkCacheWithCapacity(-1, 32)) + }) + assert.Panics(t, func() { + assert.NotNil(t, NewMapChunkCacheWithCapacity(32, -1)) + }) + }) + t.Run("CachesChunks", func(t *testing.T) { + var seed [32]byte + rand := rand.NewChaCha8(seed) + cache := NewMapChunkCacheWithCapacity(8, 8) + inserted := make(hash.HashSet) + // Insert some chunks. + for i := 0; i < 8; i++ { + bs := make([]byte, 512) + rand.Read(bs) + chk := chunks.NewChunk(bs) + inserted.Insert(chk.Hash()) + cc := nbs.ChunkToCompressedChunk(chk) + cache.InsertChunks([]nbs.ToChunker{cc}) + } - absent = mapChunkCache.Has(joinedHashes) + // Query for those chunks, plus some that were not inserted. + query := make(hash.HashSet) + for h := range inserted { + query.Insert(h) + } + for i := 0; i < 8; i++ { + var bs [512]byte + rand.Read(bs[:]) + query.Insert(hash.Of(bs[:])) + } - assert.True(t, reflect.DeepEqual(absent, moreHashes), "unexpected absent hashset (seed %d)", seed) + // Only got back the inserted chunks... + cached := cache.GetCachedChunks(query) + assert.Len(t, cached, 8) + + // If we insert more than our max size, and query + // for everything inserted, we get back a result + // set matching our max size. + for i := 0; i < 64; i++ { + bs := make([]byte, 512) + rand.Read(bs) + chk := chunks.NewChunk(bs) + inserted.Insert(chk.Hash()) + cc := nbs.ChunkToCompressedChunk(chk) + cache.InsertChunks([]nbs.ToChunker{cc}) + } + cached = cache.GetCachedChunks(inserted) + assert.Len(t, cached, 8) + }) + t.Run("CachesHasRecords", func(t *testing.T) { + var seed [32]byte + rand := rand.NewChaCha8(seed) + cache := NewMapChunkCacheWithCapacity(8, 8) + query := make(hash.HashSet) + for i := 0; i < 64; i++ { + var bs [512]byte + rand.Read(bs[:]) + query.Insert(hash.Of(bs[:])) + } - mapChunkCache.PutChunk(chks[0]) - mapChunkCache.PutChunk(moreChks[0]) + // Querying an empty cache returns all the hashes. + res := cache.GetCachedHas(query) + assert.NotSame(t, res, query) + assert.Len(t, res, 64) + for h := range query { + _, ok := res[h] + assert.True(t, ok, "everything in query is in res") + } - toFlush = mapChunkCache.GetAndClearChunksToFlush() + // Insert 8 of our query hashes into the cache. + insert := make(hash.HashSet) + insertTwo := make(hash.HashSet) + i := 0 + for h := range query { + if i < 8 { + insert.Insert(h) + } else { + insertTwo.Insert(h) + } + i += 1 + if i == 16 { + break + } + } + cache.InsertHas(insert) + + // Querying our original query set returns expected results. + res = cache.GetCachedHas(query) + assert.Len(t, res, 64-8) + for h := range query { + if _, ok := insert[h]; !ok { + _, ok = res[h] + assert.True(t, ok, "everything in query that is not in insert is in res") + } + } - expected := map[hash.Hash]nbs.ToChunker{moreChks[0].Hash(): moreChks[0]} - eq := reflect.DeepEqual(toFlush, expected) - assert.True(t, eq, "Missing or unexpected chunks to flush (seed %d)", seed) + // Inserting another 8 hashes hits max limit. Only 8 records cached. + cache.InsertHas(insertTwo) + res = cache.GetCachedHas(query) + assert.Len(t, res, 64-8) + }) } diff --git a/go/libraries/doltcore/remotestorage/noop_chunk_cache.go b/go/libraries/doltcore/remotestorage/noop_chunk_cache.go index b8848ef771..f48142fbe6 100644 --- a/go/libraries/doltcore/remotestorage/noop_chunk_cache.go +++ b/go/libraries/doltcore/remotestorage/noop_chunk_cache.go @@ -20,31 +20,22 @@ import ( ) // noopChunkCache is a ChunkCache implementation that stores nothing -// ever. Using a noopChunkCache with a remotestore.DoltChunkStore -// will cause the DoltChunkStore to behave incorrectly when _writing_ -// dolt repositories; this should only be used for read-only use -// cases. +// ever. This causes all fetches to go to the remote server. var noopChunkCache = &noopChunkCacheImpl{} type noopChunkCacheImpl struct { } -func (*noopChunkCacheImpl) Put(chnks []nbs.ToChunker) error { - return nil -} - -func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]nbs.ToChunker { - return make(map[hash.Hash]nbs.ToChunker) +func (*noopChunkCacheImpl) InsertChunks(cs []nbs.ToChunker) { } -func (*noopChunkCacheImpl) Has(hashes hash.HashSet) (absent hash.HashSet) { - return hashes +func (*noopChunkCacheImpl) GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.ToChunker { + return nil } -func (*noopChunkCacheImpl) PutChunk(ch nbs.ToChunker) error { - return nil +func (*noopChunkCacheImpl) InsertHas(h hash.HashSet) { } -func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker { - panic("noopChunkCache does not support GetAndClearChunksToFlush().") +func (*noopChunkCacheImpl) GetCachedHas(h hash.HashSet) (absent hash.HashSet) { + return h } diff --git a/go/libraries/doltcore/remotestorage/writebuffer.go b/go/libraries/doltcore/remotestorage/writebuffer.go new file mode 100644 index 0000000000..1187d57a7e --- /dev/null +++ b/go/libraries/doltcore/remotestorage/writebuffer.go @@ -0,0 +1,163 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotestorage + +import ( + "errors" + "sync" + + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/nbs" +) + +type WriteBuffer interface { + // Add a compressed chunk to the write buffer. It will be + // returned from future calls to |GetAllForWrite| until a + // write is successful. + Put(nbs.CompressedChunk) error + + // Returns the current set of written chunks. After this + // returns, concurrent calls to other methods may block until + // |WriteCompleted| is called. Calls to |GetAllForWrite| must + // be followed by a call to |WriteCompleted| once the write + // attempt is finished. + GetAllForWrite() map[hash.Hash]nbs.CompressedChunk + + // Called after a call to |GetAllForWrite|, this records + // success or failure of the write operation. If the write + // operation was successful, then the written chunks are now + // in the upstream and they can be cleared from the write + // buffer. Otherwise, the written chunks are retained in the + // write buffer so that the write can be retried. + WriteCompleted(success bool) + + // ChunkStore clients expect to read their own writes before a + // commit. On the get path, remotestorage should add buffered + // chunks matching a given |query| to its |result|. On the + // HasMany path, remotestorage should remove present chunks + // from its absent set on the HasMany response. + AddBufferedChunks(query hash.HashSet, result map[hash.Hash]nbs.ToChunker) + // Removes the addresses of any buffered chunks from |hashes|. + // Used to filter the |absent| response of a HasMany call so + // that buffered chunks are not considered absent. + RemovePresentChunks(hashes hash.HashSet) +} + +type noopWriteBuffer struct { +} + +var _ WriteBuffer = noopWriteBuffer{} + +func (noopWriteBuffer) Put(nbs.CompressedChunk) error { + return errors.New("unsupported operation: write on a read-only remotestorage chunk store") +} + +func (noopWriteBuffer) GetAllForWrite() map[hash.Hash]nbs.CompressedChunk { + panic("attempt to upload chunks on a read-only remotestorage chunk store") +} + +func (noopWriteBuffer) WriteCompleted(success bool) { + panic("call to WriteCompleted on a noopWriteBuffer") +} + +func (noopWriteBuffer) AddBufferedChunks(hash.HashSet, map[hash.Hash]nbs.ToChunker) { +} + +func (noopWriteBuffer) RemovePresentChunks(hash.HashSet) { +} + +// A simple WriteBuffer which buffers unlimited data in memory and +// waits to flush it. +type mapWriteBuffer struct { + mu sync.Mutex + cond sync.Cond + // Set when an outstanding write is in progress, |Put| will + // block against this. Reset by |WriteCompleted| after the + // appropriate updates to |chunks| have been made. + writing bool + chunks map[hash.Hash]nbs.CompressedChunk +} + +func newMapWriteBuffer() *mapWriteBuffer { + ret := &mapWriteBuffer{ + chunks: make(map[hash.Hash]nbs.CompressedChunk), + } + ret.cond.L = &ret.mu + return ret +} + +func (b *mapWriteBuffer) Put(cc nbs.CompressedChunk) error { + b.mu.Lock() + defer b.mu.Unlock() + for b.writing { + b.cond.Wait() + } + b.chunks[cc.H] = cc + return nil +} + +func (b *mapWriteBuffer) GetAllForWrite() map[hash.Hash]nbs.CompressedChunk { + b.mu.Lock() + defer b.mu.Unlock() + for b.writing { + b.cond.Wait() + } + b.writing = true + return b.chunks +} + +func (b *mapWriteBuffer) WriteCompleted(success bool) { + b.mu.Lock() + defer b.mu.Unlock() + if !b.writing { + panic("mapWriteBuffer got WriteCompleted while no write was in progress") + } + b.writing = false + if success { + b.chunks = make(map[hash.Hash]nbs.CompressedChunk) + } + b.cond.Broadcast() +} + +func (b *mapWriteBuffer) AddBufferedChunks(hs hash.HashSet, res map[hash.Hash]nbs.ToChunker) { + b.mu.Lock() + defer b.mu.Unlock() + for h := range hs { + cc, ok := b.chunks[h] + if ok { + res[h] = cc + } + } +} + +func (b *mapWriteBuffer) RemovePresentChunks(hashes hash.HashSet) { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.chunks) < len(hashes) { + for h := range b.chunks { + hashes.Remove(h) + } + } else { + var toRemove []hash.Hash + for h := range hashes { + if _, ok := b.chunks[h]; ok { + toRemove = append(toRemove, h) + } + } + for _, h := range toRemove { + hashes.Remove(h) + } + } +} diff --git a/go/libraries/doltcore/remotestorage/writebuffer_test.go b/go/libraries/doltcore/remotestorage/writebuffer_test.go new file mode 100644 index 0000000000..aac1454cb5 --- /dev/null +++ b/go/libraries/doltcore/remotestorage/writebuffer_test.go @@ -0,0 +1,154 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotestorage + +import ( + "math/rand/v2" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/nbs" +) + +func TestNoopWriteBuffer(t *testing.T) { + cache := noopWriteBuffer{} + err := cache.Put(nbs.CompressedChunk{}) + assert.NotNil(t, err) + assert.Panics(t, func() { + cache.GetAllForWrite() + }) + assert.Panics(t, func() { + cache.WriteCompleted(false) + }) + cache.AddBufferedChunks(make(hash.HashSet), make(map[hash.Hash]nbs.ToChunker)) + cache.RemovePresentChunks(make(hash.HashSet)) +} + +func TestMapWriteBuffer(t *testing.T) { + t.Run("SmokeTest", func(t *testing.T) { + // A bit of a typical usage... + cache := newMapWriteBuffer() + var seed [32]byte + rand := rand.NewChaCha8(seed) + query := make(hash.HashSet) + for i := 0; i < 64; i++ { + var bs [512]byte + rand.Read(bs[:]) + query.Insert(hash.Of(bs[:])) + } + res := make(map[hash.Hash]nbs.ToChunker) + cache.AddBufferedChunks(query, res) + assert.Len(t, res, 0) + + // Insert some chunks. + inserted := make(hash.HashSet) + for i := 0; i < 8; i++ { + bs := make([]byte, 512) + rand.Read(bs) + chk := chunks.NewChunk(bs) + cache.Put(nbs.ChunkToCompressedChunk(chk)) + inserted.Insert(chk.Hash()) + } + cache.AddBufferedChunks(query, res) + assert.Len(t, res, 0) + for h := range inserted { + query.Insert(h) + } + cache.AddBufferedChunks(query, res) + assert.Len(t, res, 8) + + cache.RemovePresentChunks(query) + assert.Len(t, query, 64) + for h := range inserted { + query.Insert(h) + } + + // Cache continues working for reads during a pending write. + toWrite := cache.GetAllForWrite() + assert.Len(t, toWrite, 8) + res = make(map[hash.Hash]nbs.ToChunker) + cache.AddBufferedChunks(query, res) + assert.Len(t, res, 8) + cache.RemovePresentChunks(query) + assert.Len(t, query, 64) + + // After a failure, chunks are still present. + cache.WriteCompleted(false) + toWrite = cache.GetAllForWrite() + assert.Len(t, toWrite, 8) + // And after a success, they are cleared. + cache.WriteCompleted(true) + toWrite = cache.GetAllForWrite() + assert.Len(t, toWrite, 0) + cache.WriteCompleted(true) + }) + t.Run("ConcurrentPuts", func(t *testing.T) { + cache := newMapWriteBuffer() + var seed [32]byte + seedRand := rand.NewChaCha8(seed) + const numThreads = 16 + var wg sync.WaitGroup + // One thread is writing and failing... + wg.Add(1) + var writes int + go func() { + defer wg.Done() + for { + cached := cache.GetAllForWrite() + writes += 1 + time.Sleep(5 * time.Millisecond) + if len(cached) == numThreads*32 { + cache.WriteCompleted(false) + return + } + cache.WriteCompleted(false) + time.Sleep(100 * time.Microsecond) + } + }() + wg.Add(numThreads) + var inserted [numThreads][]hash.Hash + for i := 0; i < numThreads; i++ { + var seed [32]byte + seedRand.Read(seed[:]) + randCha := rand.NewChaCha8(seed) + go func() { + for j := 0; j < 32; j++ { + var bs [512]byte + randCha.Read(bs[:]) + chk := chunks.NewChunk(bs[:]) + cache.Put(nbs.ChunkToCompressedChunk(chk)) + inserted[i] = append(inserted[i], chk.Hash()) + } + defer wg.Done() + }() + } + wg.Wait() + // All writes failed. Let's make sure we have everything we expect. + cached := cache.GetAllForWrite() + assert.Len(t, cached, 32*numThreads) + for i := range inserted { + for _, h := range inserted[i] { + _, ok := cached[h] + assert.True(t, ok) + } + } + cache.WriteCompleted(true) + }) +}