Skip to content

Commit

Permalink
go: remotestorage: chunk_store.go: Clean up ChunkCache.
Browse files Browse the repository at this point in the history
When DoltChunkStore was implemented, it needed to do write buffering in order
to read its own writes and to flush table files to the upstream. At some point,
read caching and caching of has many results was added onto the write buffer,
creating confusion and despair. This change separates back out the use cases.
  • Loading branch information
reltuk committed Feb 13, 2025
1 parent 21547c4 commit 5eea56b
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 285 deletions.
32 changes: 15 additions & 17 deletions go/libraries/doltcore/remotestorage/chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,21 @@ import (
"github.com/dolthub/dolt/go/store/nbs"
)

// ChunkCache is an interface used for caching chunks
// ChunkCache is an interface used for caching chunks and has presence that
// has already been fetched from remotestorage. Care should be taken when
// using ChunkCache if it is possible for the remote to GC, since in that
// case the cache could contain stale data.
type ChunkCache interface {
// Put puts a slice of chunks into the cache.
Put(c []nbs.CompressedChunk) bool
// Insert some observed / fetched chunks into the cached. These
// chunks may or may not be returned in the future.
InsertChunks(cs []nbs.CompressedChunk)
// Get previously cached chunks, if they are still available.
GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk

// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
Get(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk

// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
Has(h hash.HashSet) (absent hash.HashSet)

// PutChunk puts a single chunk in the cache. true returns in the event that the chunk was cached successfully
// and false is returned if that chunk is already is the cache.
PutChunk(chunk nbs.CompressedChunk) bool

// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk
// Insert all hashes in |h| as existing in the remote.
InsertHas(h hash.HashSet)
// Returns the absent set from |h|, filtering it by records
// which are known to be present in the remote based on
// previous |InsertHas| calls.
GetCachedHas(h hash.HashSet) (absent hash.HashSet)
}
105 changes: 42 additions & 63 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/dolthub/dolt/go/store/types"
)

var ErrCacheCapacityExceeded = errors.New("too much data: the cache capacity has been reached")
var ErrWriteBufferCapacityExceeded = errors.New("too much data: the write buffer capacity has been reached")

var ErrUploadFailed = errors.New("upload failed")

Expand Down Expand Up @@ -123,6 +123,7 @@ type DoltChunkStore struct {
root hash.Hash
csClient remotesapi.ChunkStoreServiceClient
finalizer func() error
wb WriteBuffer
cache ChunkCache
metadata *remotesapi.GetRepoMetadataResponse
nbf *types.NomsBinFormat
Expand Down Expand Up @@ -172,6 +173,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
csClient: csClient,
finalizer: func() error { return nil },
cache: newMapChunkCache(),
wb: newMapWriteBuffer(),
metadata: metadata,
nbf: nbf,
httpFetcher: globalHttpFetcher,
Expand All @@ -185,60 +187,40 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
return cs, nil
}

func (dcs *DoltChunkStore) clone() *DoltChunkStore {
ret := *dcs
ret.repoToken = new(atomic.Value)
return &ret
}

func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: fetcher,
params: dcs.params,
stats: dcs.stats,
}
ret := dcs.clone()
ret.httpFetcher = fetcher
return ret
}

func (dcs *DoltChunkStore) WithNoopWriteBuffer() *DoltChunkStore {
ret := dcs.clone()
ret.wb = noopWriteBuffer{}
return ret
}

func (dcs *DoltChunkStore) WithWriteBuffer(wb WriteBuffer) *DoltChunkStore {
ret := dcs.clone()
ret.wb = wb
return ret
}

func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: noopChunkCache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: dcs.params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.cache = noopChunkCache
return ret
}

func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: dcs.params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.cache = cache
return ret
}

func (dcs *DoltChunkStore) WithNetworkRequestParams(params NetworkRequestParams) *DoltChunkStore {
Expand Down Expand Up @@ -344,7 +326,8 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
ctx, span := tracer.Start(ctx, "remotestorage.GetManyCompressed")
defer span.End()

hashToChunk := dcs.cache.Get(hashes)
hashToChunk := dcs.cache.GetCachedChunks(hashes)
dcs.wb.AddPendingChunks(hashes, hashToChunk)

span.SetAttributes(attribute.Int("num_hashes", len(hashes)), attribute.Int("cache_hits", len(hashToChunk)))
atomic.AddUint32(&dcs.stats.Hits, uint32(len(hashToChunk)))
Expand Down Expand Up @@ -604,9 +587,7 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash
}
// Don't forward on empty/not found chunks.
if len(cc.CompressedData) > 0 {
if dcs.cache.PutChunk(cc) {
return ErrCacheCapacityExceeded
}
dcs.cache.InsertChunks([]nbs.CompressedChunk{cc})
found(egCtx, cc)
}
}
Expand Down Expand Up @@ -634,8 +615,8 @@ const maxHasManyBatchSize = 16 * 1024
// absent from the store.
func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
// get the set of hashes that isn't already in the cache
notCached := dcs.cache.Has(hashes)

notCached := dcs.cache.GetCachedHas(hashes)
dcs.wb.RemovePresentChunks(notCached)
if len(notCached) == 0 {
return notCached, nil
}
Expand All @@ -644,7 +625,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
hashSl, byteSl := HashSetToSlices(notCached)

absent := make(hash.HashSet)
var found []nbs.CompressedChunk
found := make(hash.HashSet)
var err error

batchItr(len(hashSl), maxHasManyBatchSize, func(st, end int) (stop bool) {
Expand Down Expand Up @@ -685,8 +666,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
absent[currHash] = struct{}{}
j++
} else {
c := nbs.ChunkToCompressedChunk(chunks.NewChunkWithHash(currHash, []byte{}))
found = append(found, c)
found.Insert(currHash)
}
}

Expand All @@ -702,9 +682,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
}

if len(found) > 0 {
if dcs.cache.Put(found) {
return hash.HashSet{}, ErrCacheCapacityExceeded
}
dcs.cache.InsertHas(found)
}

return absent, nil
Expand Down Expand Up @@ -738,8 +716,9 @@ func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chu
}

cc := nbs.ChunkToCompressedChunk(c)
if dcs.cache.Put([]nbs.CompressedChunk{cc}) {
return ErrCacheCapacityExceeded
err = dcs.wb.Put(cc)
if err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -884,7 +863,7 @@ func (dcs *DoltChunkStore) Close() error {

// getting this working using the simplest approach first
func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, error) {
hashToChunk := dcs.cache.GetAndClearChunksToFlush()
hashToChunk := dcs.wb.GetAllAndClear()

if len(hashToChunk) == 0 {
return map[hash.Hash]int{}, nil
Expand Down
Loading

0 comments on commit 5eea56b

Please sign in to comment.