Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive DDict cache and multi-file bug fixes #8018

Merged
merged 9 commits into from
Jun 14, 2024
32 changes: 17 additions & 15 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel
outPath, _ := gs.oldGen.Path()
oldgen := gs.oldGen.tables.upstream

swapMap := make(map[hash.Hash]hash.Hash)

for tf, ogcs := range oldgen {
// NM4 - We should probably provide a way to pick a particular table file to build an archive for.

Expand All @@ -58,23 +60,23 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel
return err
}

// p("Verified table file: %s\n", archivePath)
swapMap[tf] = archiveName
}

//NM4 TODO: This code path must only be run on an offline database. We should add a check for that.
specs, err := gs.oldGen.tables.toSpecs()
newSpecs := make([]tableSpec, 0, len(specs))
for _, spec := range specs {
if spec.name != tf {
newSpecs = append(newSpecs, spec)
} else {
newSpecs = append(newSpecs, tableSpec{archiveName, spec.chunkCount})
}
}
err = gs.oldGen.swapTables(ctx, newSpecs)
if err != nil {
return err
//NM4 TODO: This code path must only be run on an offline database. We should add a check for that.
specs, err := gs.oldGen.tables.toSpecs()
newSpecs := make([]tableSpec, 0, len(specs))
for _, spec := range specs {
if newSpec, exists := swapMap[spec.name]; exists {
newSpecs = append(newSpecs, tableSpec{newSpec, spec.chunkCount})
} else {
newSpecs = append(newSpecs, spec)
}
}
err = gs.oldGen.swapTables(ctx, newSpecs)
if err != nil {
return err
}
} else {
return errors.New("Modern DB Expected")
}
Expand Down Expand Up @@ -277,7 +279,7 @@ func gatherAllChunks(ctx context.Context, cs chunkSource, idx tableIndex) (map[h
allChunks[h] = c
}, &stats)
if err != nil {
panic(err)
return nil, nil, err
}
if !allFound { // Unlikely to happen, given we got the list of chunks from this index.
return nil, nil, errors.New("missing chunks")
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, r
foundAll := true
for _, req := range reqs {
data, err := acs.aRdr.get(*req.a)
if err != nil {
if err != nil || data == nil {
foundAll = false
} else {
chunk := chunks.NewChunk(data)
found(ctx, &chunk)
req.found = true
}
}
return foundAll, nil
return !foundAll, nil
}

func (acs archiveChunkSource) count() (uint32, error) {
Expand Down
69 changes: 41 additions & 28 deletions go/store/nbs/archive_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/bits"

"github.com/dolthub/gozstd"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"

"github.com/dolthub/dolt/go/store/hash"
Expand All @@ -36,6 +37,7 @@ type archiveReader struct {
chunkRefs []chunkRef
suffixes []suffix
footer footer
dictCache *lru.TwoQueueCache[uint32, *gozstd.DDict]
}

type chunkRef struct {
Expand Down Expand Up @@ -178,13 +180,19 @@ func newArchiveReader(reader io.ReaderAt, fileSize uint64) (archiveReader, error
}
}

dictCache, err := lru.New2Q[uint32, *gozstd.DDict](256)
if err != nil {
return archiveReader{}, err
}

return archiveReader{
reader: reader,
prefixes: prefixes,
byteSpans: byteSpans,
chunkRefs: chunks,
suffixes: suffixes,
footer: footer,
dictCache: dictCache,
}, nil
}

Expand Down Expand Up @@ -227,28 +235,26 @@ func loadFooter(reader io.ReaderAt, fileSize uint64) (f footer, err error) {
return
}

func (ai archiveReader) search(hash hash.Hash) (int, bool) {
// search returns the index of the hash in the archive. If the hash is not found, -1 is returned.
func (ai archiveReader) search(hash hash.Hash) int {
prefix := hash.Prefix()
possibleMatch := prollyBinSearch(ai.prefixes, prefix)
targetSfx := hash.Suffix()

for i := 0; i < len(ai.suffixes); i++ {
idx := possibleMatch + i

if ai.prefixes[idx] != prefix {
return -1, false
}
if possibleMatch < 0 || possibleMatch >= len(ai.prefixes) {
return -1
}

for idx := possibleMatch; idx < len(ai.prefixes) && ai.prefixes[idx] == prefix; idx++ {
if ai.suffixes[idx] == suffix(targetSfx) {
return idx, true
return idx
}
}
return -1, true
return -1
}

func (ai archiveReader) has(hash hash.Hash) bool {
_, found := ai.search(hash)
return found
return ai.search(hash) >= 0
}

// get returns the decompressed data for the given hash. If the hash is not found, nil is returned (not an error)
Expand All @@ -262,16 +268,7 @@ func (ai archiveReader) get(hash hash.Hash) ([]byte, error) {
if dict == nil {
result, err = gozstd.Decompress(nil, data)
} else {
dcmpDict, e2 := gozstd.Decompress(nil, dict)
if e2 != nil {
return nil, e2
}

dDict, e2 := gozstd.NewDDict(dcmpDict)
if e2 != nil {
return nil, e2
}
result, err = gozstd.DecompressDict(nil, data, dDict)
result, err = gozstd.DecompressDict(nil, data, dict)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -303,18 +300,34 @@ func (ai archiveReader) readByteSpan(bs byteSpan) ([]byte, error) {
// no error is returned in this case. Errors will only be returned if there is an io error.
//
// The data returned is still compressed, regardless of the dictionary being present or not.
func (ai archiveReader) getRaw(hash hash.Hash) (dict, data []byte, err error) {
idx, found := ai.search(hash)
if !found {
func (ai archiveReader) getRaw(hash hash.Hash) (dict *gozstd.DDict, data []byte, err error) {
idx := ai.search(hash)
if idx < 0 {
return nil, nil, nil
}

chunkRef := ai.chunkRefs[idx]
if chunkRef.dict != 0 {
byteSpan := ai.byteSpans[chunkRef.dict]
dict, err = ai.readByteSpan(byteSpan)
if err != nil {
return nil, nil, err
if cached, cacheHit := ai.dictCache.Get(chunkRef.dict); cacheHit {
dict = cached
} else {
byteSpan := ai.byteSpans[chunkRef.dict]
dictBytes, err := ai.readByteSpan(byteSpan)
if err != nil {
return nil, nil, err
}
// Dictionaries are compressed with no dictionary.
dcmpDict, e2 := gozstd.Decompress(nil, dictBytes)
if e2 != nil {
return nil, nil, e2
}

dict, e2 = gozstd.NewDDict(dcmpDict)
if e2 != nil {
return nil, nil, e2
}

ai.dictCache.Add(chunkRef.dict, dict)
}
}

Expand Down
38 changes: 22 additions & 16 deletions go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ func TestArchiveSingleChunk(t *testing.T) {
}

func TestArchiveSingleChunkWithDictionary(t *testing.T) {
writer := NewFixedBufferByteSink(make([]byte, 1024))
writer := NewFixedBufferByteSink(make([]byte, 4096))
aw := newArchiveWriterWithSink(writer)
testDict := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
testDict, _ := generateTerribleDefaultDictionary()
testData := []byte{9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
_, _ = aw.writeByteSpan(testDict)
_, _ = aw.writeByteSpan(testData)
Expand All @@ -103,19 +103,19 @@ func TestArchiveSingleChunkWithDictionary(t *testing.T) {

dict, data, err := aIdx.getRaw(h)
assert.NoError(t, err)
assert.Equal(t, testDict, dict)
assert.NotNil(t, dict)
assert.Equal(t, testData, data)
}

func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
writer := NewFixedBufferByteSink(make([]byte, 1024))
writer := NewFixedBufferByteSink(make([]byte, 4096))
aw := newArchiveWriterWithSink(writer)
data1 := []byte{11, 11, 11, 11, 11, 11, 11, 11, 11, 11} // span 1
dict1 := []byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1} // span 2
dict1, _ := generateDictionary(1) // span 2
data2 := []byte{22, 22, 22, 22, 22, 22, 22, 22, 22, 22} // span 3
data3 := []byte{33, 33, 33, 33, 33, 33, 33, 33, 33, 33} // span 4
data4 := []byte{44, 44, 44, 44, 44, 44, 44, 44, 44, 44} // span 5
dict2 := []byte{2, 2, 2, 2, 2, 2, 2, 2, 2, 2} // span 6
dict2, _ := generateDictionary(2) // span 6

h1 := hashWithPrefix(t, 42)
id, _ := aw.writeByteSpan(data1)
Expand Down Expand Up @@ -174,27 +174,27 @@ func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
assert.Equal(t, data1, data)

dict, data, _ = aIdx.getRaw(h2)
assert.Equal(t, dict1, dict)
assert.NotNil(t, dict)
assert.Equal(t, data2, data)

dict, data, _ = aIdx.getRaw(h3)
assert.Equal(t, dict1, dict)
assert.NotNil(t, dict)
assert.Equal(t, data3, data)

dict, data, _ = aIdx.getRaw(h4)
assert.Nil(t, dict)
assert.Equal(t, data, data)

dict, data, _ = aIdx.getRaw(h5)
assert.Equal(t, dict2, dict)
assert.NotNil(t, dict)
assert.Equal(t, data1, data)

dict, data, _ = aIdx.getRaw(h6)
assert.Equal(t, dict2, dict)
assert.NotNil(t, dict)
assert.Equal(t, data1, data)

dict, data, _ = aIdx.getRaw(h7)
assert.Equal(t, dict1, dict)
assert.NotNil(t, dict)
assert.Equal(t, data3, data)
}

Expand Down Expand Up @@ -528,7 +528,7 @@ func TestArchiveChunkGroup(t *testing.T) {
// depend on the random data generated. If the random data generation changes, these numbers will
//
// The totalBytesSavedWDict is eyeballed to be correct.
defDict := generateTerribleDefaultDictionary()
_, defDict := generateTerribleDefaultDictionary()

cg := newChunkGroup(nil, generateSimilarChunks(42, 10), defDict)
assert.True(t, cg.totalRatioWDict >= 0.8666)
Expand Down Expand Up @@ -573,12 +573,18 @@ func hashWithPrefix(t *testing.T, prefix uint64) hash.Hash {
return hash.Hash(randomBytes)
}

// For tests which need a default dictionary, we generate a terrible one so it won't be used.
func generateTerribleDefaultDictionary() *gozstd.CDict {
chks := generateSimilarChunks(1977, 10)
// For tests which need a dictionary, we generate a terrible one because we don't care about the actual compression.
// We return both the raw form and the CDict form.
func generateTerribleDefaultDictionary() ([]byte, *gozstd.CDict) {
return generateDictionary(1977)
}

func generateDictionary(seed int64) ([]byte, *gozstd.CDict) {
chks := generateSimilarChunks(seed, 10)
rawDict := buildDictionary(chks)
cDict, _ := gozstd.NewCDict(rawDict)
return cDict
rawDict = gozstd.Compress(nil, rawDict)
return rawDict, cDict
}

func generateSimilarChunks(seed int64, count int) []*chunks.Chunk {
Expand Down
Loading
Loading