From 5eea56b79d1cde2734aa0e7c12adab5502078f66 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 13 Feb 2025 13:01:57 -0800 Subject: [PATCH] go: remotestorage: chunk_store.go: Clean up ChunkCache. When DoltChunkStore was implemented, it needed to do write buffering in order to read its own writes and to flush table files to the upstream. At some point, read caching and caching of has many results was added onto the write buffer, creating confusion and despair. This change separates back out the use cases. --- .../doltcore/remotestorage/chunk_cache.go | 32 ++--- .../doltcore/remotestorage/chunk_store.go | 105 ++++++-------- .../doltcore/remotestorage/map_chunk_cache.go | 135 ++++++------------ .../remotestorage/map_chunk_cache_test.go | 94 ------------ .../remotestorage/noop_chunk_cache.go | 23 +-- .../doltcore/remotestorage/writebuffer.go | 98 +++++++++++++ 6 files changed, 202 insertions(+), 285 deletions(-) delete mode 100644 go/libraries/doltcore/remotestorage/map_chunk_cache_test.go create mode 100644 go/libraries/doltcore/remotestorage/writebuffer.go diff --git a/go/libraries/doltcore/remotestorage/chunk_cache.go b/go/libraries/doltcore/remotestorage/chunk_cache.go index 0412b20a0ab..d4d6c879475 100644 --- a/go/libraries/doltcore/remotestorage/chunk_cache.go +++ b/go/libraries/doltcore/remotestorage/chunk_cache.go @@ -19,23 +19,21 @@ import ( "github.com/dolthub/dolt/go/store/nbs" ) -// ChunkCache is an interface used for caching chunks +// ChunkCache is an interface used for caching chunks and has presence that +// has already been fetched from remotestorage. Care should be taken when +// using ChunkCache if it is possible for the remote to GC, since in that +// case the cache could contain stale data. type ChunkCache interface { - // Put puts a slice of chunks into the cache. - Put(c []nbs.CompressedChunk) bool + // Insert some observed / fetched chunks into the cached. These + // chunks may or may not be returned in the future. + InsertChunks(cs []nbs.CompressedChunk) + // Get previously cached chunks, if they are still available. + GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk - // Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty. - // is put in it's place - Get(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk - - // Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it. - Has(h hash.HashSet) (absent hash.HashSet) - - // PutChunk puts a single chunk in the cache. true returns in the event that the chunk was cached successfully - // and false is returned if that chunk is already is the cache. - PutChunk(chunk nbs.CompressedChunk) bool - - // GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache - // between the last time GetAndClearChunksToFlush was called and now. - GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk + // Insert all hashes in |h| as existing in the remote. + InsertHas(h hash.HashSet) + // Returns the absent set from |h|, filtering it by records + // which are known to be present in the remote based on + // previous |InsertHas| calls. + GetCachedHas(h hash.HashSet) (absent hash.HashSet) } diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 70935dea1a2..6f1a0dfcb7a 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -46,7 +46,7 @@ import ( "github.com/dolthub/dolt/go/store/types" ) -var ErrCacheCapacityExceeded = errors.New("too much data: the cache capacity has been reached") +var ErrWriteBufferCapacityExceeded = errors.New("too much data: the write buffer capacity has been reached") var ErrUploadFailed = errors.New("upload failed") @@ -123,6 +123,7 @@ type DoltChunkStore struct { root hash.Hash csClient remotesapi.ChunkStoreServiceClient finalizer func() error + wb WriteBuffer cache ChunkCache metadata *remotesapi.GetRepoMetadataResponse nbf *types.NomsBinFormat @@ -172,6 +173,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa csClient: csClient, finalizer: func() error { return nil }, cache: newMapChunkCache(), + wb: newMapWriteBuffer(), metadata: metadata, nbf: nbf, httpFetcher: globalHttpFetcher, @@ -185,60 +187,40 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa return cs, nil } +func (dcs *DoltChunkStore) clone() *DoltChunkStore { + ret := *dcs + ret.repoToken = new(atomic.Value) + return &ret +} + func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore { - return &DoltChunkStore{ - repoId: dcs.repoId, - repoPath: dcs.repoPath, - repoToken: new(atomic.Value), - host: dcs.host, - root: dcs.root, - csClient: dcs.csClient, - finalizer: dcs.finalizer, - cache: dcs.cache, - metadata: dcs.metadata, - nbf: dcs.nbf, - httpFetcher: fetcher, - params: dcs.params, - stats: dcs.stats, - } + ret := dcs.clone() + ret.httpFetcher = fetcher + return ret +} + +func (dcs *DoltChunkStore) WithNoopWriteBuffer() *DoltChunkStore { + ret := dcs.clone() + ret.wb = noopWriteBuffer{} + return ret +} + +func (dcs *DoltChunkStore) WithWriteBuffer(wb WriteBuffer) *DoltChunkStore { + ret := dcs.clone() + ret.wb = wb + return ret } func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore { - return &DoltChunkStore{ - repoId: dcs.repoId, - repoPath: dcs.repoPath, - repoToken: new(atomic.Value), - host: dcs.host, - root: dcs.root, - csClient: dcs.csClient, - finalizer: dcs.finalizer, - cache: noopChunkCache, - metadata: dcs.metadata, - nbf: dcs.nbf, - httpFetcher: dcs.httpFetcher, - params: dcs.params, - stats: dcs.stats, - logger: dcs.logger, - } + ret := dcs.clone() + ret.cache = noopChunkCache + return ret } func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore { - return &DoltChunkStore{ - repoId: dcs.repoId, - repoPath: dcs.repoPath, - repoToken: new(atomic.Value), - host: dcs.host, - root: dcs.root, - csClient: dcs.csClient, - finalizer: dcs.finalizer, - cache: cache, - metadata: dcs.metadata, - nbf: dcs.nbf, - httpFetcher: dcs.httpFetcher, - params: dcs.params, - stats: dcs.stats, - logger: dcs.logger, - } + ret := dcs.clone() + ret.cache = cache + return ret } func (dcs *DoltChunkStore) WithNetworkRequestParams(params NetworkRequestParams) *DoltChunkStore { @@ -344,7 +326,8 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha ctx, span := tracer.Start(ctx, "remotestorage.GetManyCompressed") defer span.End() - hashToChunk := dcs.cache.Get(hashes) + hashToChunk := dcs.cache.GetCachedChunks(hashes) + dcs.wb.AddPendingChunks(hashes, hashToChunk) span.SetAttributes(attribute.Int("num_hashes", len(hashes)), attribute.Int("cache_hits", len(hashToChunk))) atomic.AddUint32(&dcs.stats.Hits, uint32(len(hashToChunk))) @@ -604,9 +587,7 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash } // Don't forward on empty/not found chunks. if len(cc.CompressedData) > 0 { - if dcs.cache.PutChunk(cc) { - return ErrCacheCapacityExceeded - } + dcs.cache.InsertChunks([]nbs.CompressedChunk{cc}) found(egCtx, cc) } } @@ -634,8 +615,8 @@ const maxHasManyBatchSize = 16 * 1024 // absent from the store. func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { // get the set of hashes that isn't already in the cache - notCached := dcs.cache.Has(hashes) - + notCached := dcs.cache.GetCachedHas(hashes) + dcs.wb.RemovePresentChunks(notCached) if len(notCached) == 0 { return notCached, nil } @@ -644,7 +625,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha hashSl, byteSl := HashSetToSlices(notCached) absent := make(hash.HashSet) - var found []nbs.CompressedChunk + found := make(hash.HashSet) var err error batchItr(len(hashSl), maxHasManyBatchSize, func(st, end int) (stop bool) { @@ -685,8 +666,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha absent[currHash] = struct{}{} j++ } else { - c := nbs.ChunkToCompressedChunk(chunks.NewChunkWithHash(currHash, []byte{})) - found = append(found, c) + found.Insert(currHash) } } @@ -702,9 +682,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha } if len(found) > 0 { - if dcs.cache.Put(found) { - return hash.HashSet{}, ErrCacheCapacityExceeded - } + dcs.cache.InsertHas(found) } return absent, nil @@ -738,8 +716,9 @@ func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chu } cc := nbs.ChunkToCompressedChunk(c) - if dcs.cache.Put([]nbs.CompressedChunk{cc}) { - return ErrCacheCapacityExceeded + err = dcs.wb.Put(cc) + if err != nil { + return err } return nil } @@ -884,7 +863,7 @@ func (dcs *DoltChunkStore) Close() error { // getting this working using the simplest approach first func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, error) { - hashToChunk := dcs.cache.GetAndClearChunksToFlush() + hashToChunk := dcs.wb.GetAllAndClear() if len(hashToChunk) == 0 { return map[hash.Hash]int{}, nil diff --git a/go/libraries/doltcore/remotestorage/map_chunk_cache.go b/go/libraries/doltcore/remotestorage/map_chunk_cache.go index ea5dad323ef..2fa46a00f0e 100644 --- a/go/libraries/doltcore/remotestorage/map_chunk_cache.go +++ b/go/libraries/doltcore/remotestorage/map_chunk_cache.go @@ -19,125 +19,70 @@ import ( "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nbs" + lru "github.com/hashicorp/golang-lru/v2" ) -// mapChunkCache is a ChunkCache implementation that stores everything in an in memory map. +// mapChunkCache is a really dumb ChunkCache implementation that +// stores cached chunks and has records in lru caches. type mapChunkCache struct { - mu *sync.Mutex - hashToChunk map[hash.Hash]nbs.CompressedChunk - toFlush map[hash.Hash]nbs.CompressedChunk - cm CapacityMonitor + mu *sync.Mutex + chunks *lru.TwoQueueCache[hash.Hash,nbs.CompressedChunk] + has *lru.TwoQueueCache[hash.Hash,struct{}] } +const defaultCacheChunkCapacity = 32 * 1024 +const defaultCacheHasCapacity = 1024 * 1024 + func newMapChunkCache() *mapChunkCache { - return &mapChunkCache{ - &sync.Mutex{}, - make(map[hash.Hash]nbs.CompressedChunk), - make(map[hash.Hash]nbs.CompressedChunk), - NewUncappedCapacityMonitor(), - } + return NewMapChunkCacheWithMaxCapacity(defaultCacheChunkCapacity, defaultCacheHasCapacity) } // used by DoltHub API -func NewMapChunkCacheWithMaxCapacity(maxCapacity int64) *mapChunkCache { +func NewMapChunkCacheWithMaxCapacity(maxChunkCapacity, maxHasCapacity int) *mapChunkCache { + chunks, err := lru.New2Q[hash.Hash,nbs.CompressedChunk](maxChunkCapacity) + if err != nil { + panic(err) + } + has, err := lru.New2Q[hash.Hash,struct{}](maxHasCapacity) + if err != nil { + panic(err) + } return &mapChunkCache{ &sync.Mutex{}, - make(map[hash.Hash]nbs.CompressedChunk), - make(map[hash.Hash]nbs.CompressedChunk), - NewFixedCapacityMonitor(maxCapacity), + chunks, + has, } } -// Put puts a slice of chunks into the cache. -func (mcc *mapChunkCache) Put(chnks []nbs.CompressedChunk) bool { - mcc.mu.Lock() - defer mcc.mu.Unlock() - - for i := 0; i < len(chnks); i++ { - c := chnks[i] - h := c.Hash() - - if curr, ok := mcc.hashToChunk[h]; ok { - if !curr.IsEmpty() { - continue - } - } - - if mcc.cm.CapacityExceeded(len(c.FullCompressedChunk)) { - return true - } - - mcc.hashToChunk[h] = c - - if !c.IsEmpty() { - mcc.toFlush[h] = c - } +func (cache *mapChunkCache) InsertChunks(cs []nbs.CompressedChunk) { + for _, c := range cs { + cache.chunks.Add(c.Hash(), c) } - - return false } -// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty. -// is put in it's place -func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]nbs.CompressedChunk { - hashToChunk := make(map[hash.Hash]nbs.CompressedChunk) - - mcc.mu.Lock() - defer mcc.mu.Unlock() - - for h := range hashes { - if c, ok := mcc.hashToChunk[h]; ok { - hashToChunk[h] = c - } else { - hashToChunk[h] = nbs.EmptyCompressedChunk +func (cache *mapChunkCache) GetCachedChunks(hs hash.HashSet) map[hash.Hash]nbs.CompressedChunk { + ret := make(map[hash.Hash]nbs.CompressedChunk) + for h := range hs { + c, ok := cache.chunks.Get(h) + if ok { + ret[h] = c } } - - return hashToChunk + return ret } -// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it. -func (mcc *mapChunkCache) Has(hashes hash.HashSet) (absent hash.HashSet) { - absent = make(hash.HashSet) - - mcc.mu.Lock() - defer mcc.mu.Unlock() - - for h := range hashes { - if _, ok := mcc.hashToChunk[h]; !ok { - absent[h] = struct{}{} - } +func (cache *mapChunkCache) InsertHas(hs hash.HashSet) { + for h := range hs { + cache.has.Add(h, struct{}{}) } - - return absent } -func (mcc *mapChunkCache) PutChunk(ch nbs.CompressedChunk) bool { - mcc.mu.Lock() - defer mcc.mu.Unlock() - - h := ch.Hash() - if existing, ok := mcc.hashToChunk[h]; !ok || existing.IsEmpty() { - if mcc.cm.CapacityExceeded(len(ch.FullCompressedChunk)) { - return true +func (cache *mapChunkCache) GetCachedHas(hs hash.HashSet) hash.HashSet { + ret := make(hash.HashSet) + for h := range hs { + if !cache.has.Contains(h) { + ret.Insert(h) } - mcc.hashToChunk[h] = ch - mcc.toFlush[h] = ch } - - return false -} - -// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache -// between the last time GetAndClearChunksToFlush was called and now. -func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk { - newToFlush := make(map[hash.Hash]nbs.CompressedChunk) - - mcc.mu.Lock() - defer mcc.mu.Unlock() - - toFlush := mcc.toFlush - mcc.toFlush = newToFlush - - return toFlush + return ret } diff --git a/go/libraries/doltcore/remotestorage/map_chunk_cache_test.go b/go/libraries/doltcore/remotestorage/map_chunk_cache_test.go deleted file mode 100644 index 6944eb98c35..00000000000 --- a/go/libraries/doltcore/remotestorage/map_chunk_cache_test.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2019 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package remotestorage - -import ( - "math/rand" - "reflect" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/dolthub/dolt/go/store/chunks" - "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/nbs" -) - -func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []nbs.CompressedChunk) { - chks := make([]nbs.CompressedChunk, n) - hashes := make(hash.HashSet) - for i := 0; i < n; i++ { - size := int(rng.Int31n(99) + 1) - bytes := make([]byte, size) - for j := 0; j < size; j++ { - bytes[j] = byte(rng.Int31n(255)) - } - - chk := nbs.ChunkToCompressedChunk(chunks.NewChunk(bytes)) - chks[i] = chk - - hashes[chk.Hash()] = struct{}{} - } - - return hashes, chks -} - -func TestMapChunkCache(t *testing.T) { - const chunkBatchSize = 10 - - seed := time.Now().UnixNano() - rng := rand.New(rand.NewSource(seed)) - hashes, chks := genRandomChunks(rng, chunkBatchSize) - - mapChunkCache := newMapChunkCache() - mapChunkCache.Put(chks) - hashToChunk := mapChunkCache.Get(hashes) - - assert.Equal(t, len(hashToChunk), chunkBatchSize, "Did not read back all chunks (seed %d)", seed) - - absent := mapChunkCache.Has(hashes) - - assert.Equal(t, len(absent), 0, "Missing chunks that were added (seed %d)", seed) - - toFlush := mapChunkCache.GetAndClearChunksToFlush() - - assert.True(t, reflect.DeepEqual(toFlush, hashToChunk), "unexpected or missing chunks to flush (seed %d)", seed) - - moreHashes, moreChks := genRandomChunks(rng, chunkBatchSize) - - joinedHashes := make(hash.HashSet) - - for h := range hashes { - joinedHashes[h] = struct{}{} - } - - for h := range moreHashes { - joinedHashes[h] = struct{}{} - } - - absent = mapChunkCache.Has(joinedHashes) - - assert.True(t, reflect.DeepEqual(absent, moreHashes), "unexpected absent hashset (seed %d)", seed) - - mapChunkCache.PutChunk(chks[0]) - mapChunkCache.PutChunk(moreChks[0]) - - toFlush = mapChunkCache.GetAndClearChunksToFlush() - - expected := map[hash.Hash]nbs.CompressedChunk{moreChks[0].Hash(): moreChks[0]} - eq := reflect.DeepEqual(toFlush, expected) - assert.True(t, eq, "Missing or unexpected chunks to flush (seed %d)", seed) -} diff --git a/go/libraries/doltcore/remotestorage/noop_chunk_cache.go b/go/libraries/doltcore/remotestorage/noop_chunk_cache.go index 8edf589c31b..884bdff9347 100644 --- a/go/libraries/doltcore/remotestorage/noop_chunk_cache.go +++ b/go/libraries/doltcore/remotestorage/noop_chunk_cache.go @@ -20,31 +20,22 @@ import ( ) // noopChunkCache is a ChunkCache implementation that stores nothing -// ever. Using a noopChunkCache with a remotestore.DoltChunkStore -// will cause the DoltChunkStore to behave incorrectly when _writing_ -// dolt repositories; this should only be used for read-only use -// cases. +// ever. This causes all fetches to go to the remote server. var noopChunkCache = &noopChunkCacheImpl{} type noopChunkCacheImpl struct { } -func (*noopChunkCacheImpl) Put(chnks []nbs.CompressedChunk) bool { - return false +func (*noopChunkCacheImpl) InsertChunks(cs []nbs.CompressedChunk) { } -func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]nbs.CompressedChunk { - return make(map[hash.Hash]nbs.CompressedChunk) +func (*noopChunkCacheImpl) GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk { + return nil } -func (*noopChunkCacheImpl) Has(hashes hash.HashSet) (absent hash.HashSet) { - return hashes +func (*noopChunkCacheImpl) InsertHas(h hash.HashSet) { } -func (*noopChunkCacheImpl) PutChunk(ch nbs.CompressedChunk) bool { - return false -} - -func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk { - panic("noopChunkCache does not support GetAndClearChunksToFlush().") +func (*noopChunkCacheImpl) GetCachedHas(h hash.HashSet) (absent hash.HashSet) { + return h } diff --git a/go/libraries/doltcore/remotestorage/writebuffer.go b/go/libraries/doltcore/remotestorage/writebuffer.go new file mode 100644 index 00000000000..41f274bcc18 --- /dev/null +++ b/go/libraries/doltcore/remotestorage/writebuffer.go @@ -0,0 +1,98 @@ +package remotestorage + +import ( + "errors" + "sync" + + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/nbs" +) + +type WriteBuffer interface { + Put(nbs.CompressedChunk) error + GetAllAndClear() map[hash.Hash]nbs.CompressedChunk + + // ChunkStore clients expect to read their own writes before a commit. + // On the get path, remotestorage should add pending chunks to its result + // set. On the HasMany path, remotestorage should remove present chunks + // from its absent set on the HasMany response. + AddPendingChunks(h hash.HashSet, res map[hash.Hash]nbs.CompressedChunk) + RemovePresentChunks(h hash.HashSet) +} + +type noopWriteBuffer struct { +} +var _ WriteBuffer = noopWriteBuffer{} + +func (noopWriteBuffer) Put(nbs.CompressedChunk) error { + return errors.New("unsupported operation: write on a read-only remotestorage chunk store") +} + +func (noopWriteBuffer) GetAllAndClear() map[hash.Hash]nbs.CompressedChunk { + panic("attempt to upload chunks on a read-only remotestorage chunk store") +} + +func (noopWriteBuffer) AddPendingChunks(hash.HashSet, map[hash.Hash]nbs.CompressedChunk) { +} + +func (noopWriteBuffer) RemovePresentChunks(hash.HashSet) { +} + +// A simple WriteBuffer which buffers unlimited data in memory and +// waits to flush it. +type mapWriteBuffer struct { + mu sync.Mutex + chunks map[hash.Hash]nbs.CompressedChunk +} + +func newMapWriteBuffer() *mapWriteBuffer { + var ret mapWriteBuffer + ret.chunks = make(map[hash.Hash]nbs.CompressedChunk) + return &ret +} + +func (b *mapWriteBuffer) Put(cc nbs.CompressedChunk) error { + b.mu.Lock() + defer b.mu.Unlock() + b.chunks[cc.H] = cc + return nil +} + +func (b *mapWriteBuffer) GetAllAndClear() map[hash.Hash]nbs.CompressedChunk { + b.mu.Lock() + defer b.mu.Unlock() + ret := b.chunks + b.chunks = make(map[hash.Hash]nbs.CompressedChunk) + return ret +} + +func (b *mapWriteBuffer) AddPendingChunks(hs hash.HashSet, res map[hash.Hash]nbs.CompressedChunk) { + b.mu.Lock() + defer b.mu.Unlock() + for h := range hs { + cc, ok := b.chunks[h] + if ok { + res[h] = cc + } + } +} + +func (b *mapWriteBuffer) RemovePresentChunks(absent hash.HashSet) { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.chunks) < len(absent) { + for h := range b.chunks { + absent.Remove(h) + } + } else { + var toRemove []hash.Hash + for h := range absent { + if _, ok := b.chunks[h]; ok { + toRemove = append(toRemove, h) + } + } + for _, h := range toRemove { + absent.Remove(h) + } + } +}