Skip to content

Commit

Permalink
PR feedback to simplify on tableReaderAt
Browse files Browse the repository at this point in the history
  • Loading branch information
macneale4 committed Feb 26, 2025
1 parent 9d74c21 commit 437b7b0
Show file tree
Hide file tree
Showing 16 changed files with 166 additions and 272 deletions.
2 changes: 1 addition & 1 deletion go/cmd/dolt/commands/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (cmd ArchiveCmd) Exec(ctx context.Context, commandStr string, args []string
return 1
}

storageMetadata, err := env.GetMultiEnvStorageMetadata(dEnv.FS)
storageMetadata, err := env.GetMultiEnvStorageMetadata(ctx, dEnv.FS)
if err != nil {
cli.PrintErrln(err)
return 1
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/env/multi_repo_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (sms StorageMetadataMap) ArchiveFilesPresent() bool {
return false
}

func GetMultiEnvStorageMetadata(dataDirFS filesys.Filesys) (StorageMetadataMap, error) {
func GetMultiEnvStorageMetadata(ctx context.Context, dataDirFS filesys.Filesys) (StorageMetadataMap, error) {
dbMap := make(map[string]filesys.Filesys)

path, err := dataDirFS.Abs("")
Expand Down Expand Up @@ -115,7 +115,7 @@ func GetMultiEnvStorageMetadata(dataDirFS filesys.Filesys) (StorageMetadataMap,
return nil, err
}

sm, err := nbs.GetStorageMetadata(fsStr)
sm, err := nbs.GetStorageMetadata(ctx, fsStr, &nbs.Stats{})
if err != nil {
return nil, err
}
Expand Down
11 changes: 3 additions & 8 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,19 +441,14 @@ func gatherAllChunks(ctx context.Context, cs chunkSource, idx tableIndex, stats

return chkCache, defaultSamples, nil
}
func verifyAllChunks(ctx context.Context, idx tableIndex, archiveFile string, progress chan interface{}, stats *Stats) error {
file, err := os.Open(archiveFile)
if err != nil {
return err
}

stat, err := file.Stat()
func verifyAllChunks(ctx context.Context, idx tableIndex, archiveFile string, progress chan interface{}, stats *Stats) error {
fra, err := newFileReaderAt(archiveFile)
if err != nil {
return err
}
fileSize := stat.Size()

index, err := newArchiveReader(newFlexibleReader(file), uint64(fileSize))
index, err := newArchiveReader(ctx, fra, uint64(fra.sz), stats)
if err != nil {
return err
}
Expand Down
69 changes: 14 additions & 55 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/binary"
"io"
"os"
"path/filepath"

"github.com/pkg/errors"
Expand All @@ -35,15 +34,15 @@ type archiveChunkSource struct {

var _ chunkSource = &archiveChunkSource{}

func newArchiveChunkSource(ctx context.Context, dir string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider) (archiveChunkSource, error) {
func newArchiveChunkSource(ctx context.Context, dir string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (archiveChunkSource, error) {
archiveFile := filepath.Join(dir, h.String()+ArchiveFileSuffix)

file, size, err := openFileReader(archiveFile)
fra, err := newFileReaderAt(archiveFile)
if err != nil {
return archiveChunkSource{}, err
}

aRdr, err := newArchiveReader(file, size)
aRdr, err := newArchiveReader(ctx, fra, uint64(fra.sz), stats)
if err != nil {
return archiveChunkSource{}, err
}
Expand All @@ -60,33 +59,18 @@ func newAWSArchiveChunkSource(ctx context.Context,

footer := make([]byte, archiveFooterSize)
// sz is what we are really after here, but we'll use the bytes to construct the footer to avoid another call.
_, sz, err := s3.readRange(ctx, name, footer, httpEndRangeHeader(int(archiveFooterSize)))
_, sz, err := s3.readS3ObjectFromEnd(ctx, name, footer, stats)
if err != nil {
return emptyChunkSource{}, err
}

rdr := newFlexibleReader(s3ReaderAtWithStats{name, s3})
aRdr, err := newArchiveReaderFromFooter(rdr, sz, footer)
aRdr, err := newArchiveReaderFromFooter(ctx, &s3TableReaderAt{s3, name}, sz, footer, stats)
if err != nil {
return archiveChunkSource{}, err
}
return archiveChunkSource{"", aRdr}, nil
}

func openFileReader(file string) (flexTableReaderAt, uint64, error) {
f, err := os.Open(file)
if err != nil {
return flexTableReaderAt{}, 0, err
}

stat, err := f.Stat()
if err != nil {
return flexTableReaderAt{}, 0, err
}

return newFlexibleReader(f), uint64(stat.Size()), nil
}

func (acs archiveChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) {
res := acs.aRdr.has(h)
if res && keeper != nil && keeper(h) {
Expand Down Expand Up @@ -174,11 +158,11 @@ func (acs archiveChunkSource) currentSize() uint64 {

// reader returns a reader for the entire archive file.
func (acs archiveChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, error) {
rdr := acs.aRdr.reader
fz := acs.currentSize()

rc := io.NewSectionReader(rdr, 0, int64(fz))
return io.NopCloser(rc), fz, nil
rd, err := acs.aRdr.reader.Reader(ctx)
if err != nil {
return nil, 0, err
}
return rd, acs.currentSize(), nil
}
func (acs archiveChunkSource) uncompressedLen() (uint64, error) {
return 0, errors.New("Archive chunk source does not support uncompressedLen")
Expand All @@ -189,36 +173,11 @@ func (acs archiveChunkSource) index() (tableIndex, error) {
}

func (acs archiveChunkSource) clone() (chunkSource, error) {
var newRdr *flexTableReaderAt

// We get into the guts a little bit on flexTableReaderAt here. There isn't a great
// way to support cloning of IO mechanisms in a generic way.
if acs.aRdr.reader.ioRdr != nil {
// io.File is the only type we expect.
rdr, _ := (*acs.aRdr.reader.ioRdr).(io.ReaderAt)
if _, ok := rdr.(*os.File); ok {
newFile, err := os.Open(acs.file)
if err != nil {
return nil, err
}
fr := newFlexibleReader(newFile)
newRdr = &fr
}
} else if acs.aRdr.reader.nbsRdrWs != nil {
// Currently we only expect the s3ReaderAtWithStats instances here.
rdr, _ := (*acs.aRdr.reader.nbsRdrWs).(ReaderAtWithStats)
if _, ok := rdr.(s3ReaderAtWithStats); ok {
fr := newFlexibleReader(rdr)
newRdr = &fr
}
}

if newRdr == nil {
return nil, errors.New("runtime error: unable to clone reader")
reader, err := acs.aRdr.clone()
if err != nil {
return nil, err
}

rdr := acs.aRdr.clone(*newRdr)
return archiveChunkSource{acs.file, rdr}, nil
return archiveChunkSource{acs.file, reader}, nil
}

func (acs archiveChunkSource) getRecordRanges(_ context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
Expand Down
72 changes: 44 additions & 28 deletions go/store/nbs/archive_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// archiveReader is a reader for the archive format. We use primitive type slices where possible. These are read directly
// from disk into memory for speed. The downside is complexity on the read path, but it's all constant time.
type archiveReader struct {
reader flexTableReaderAt
reader tableReaderAt
prefixes []uint64
spanIndex []uint64
chunkRefs []uint32 // Pairs of uint32s. First is the dict id, second is the data id.
Expand Down Expand Up @@ -104,8 +104,8 @@ func (f archiveFooter) metadataSpan() byteSpan {
return byteSpan{offset: f.fileSize - archiveFooterSize - uint64(f.metadataSize), length: uint64(f.metadataSize)}
}

func newArchiveMetadata(reader flexTableReaderAt, fileSize uint64) (*ArchiveMetadata, error) {
footer, err := loadFooter(reader, fileSize)
func newArchiveMetadata(ctx context.Context, reader tableReaderAt, fileSize uint64, stats *Stats) (*ArchiveMetadata, error) {
footer, err := loadFooter(ctx, reader, fileSize, stats)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +115,7 @@ func newArchiveMetadata(reader flexTableReaderAt, fileSize uint64) (*ArchiveMeta
}

metaSpan := footer.metadataSpan()
metaRdr := io.NewSectionReader(reader, int64(metaSpan.offset), int64(metaSpan.length))
metaRdr := newSectionReader(ctx, reader, int64(metaSpan.offset), int64(metaSpan.length), stats)

// Read the data into a byte slice
metaData := make([]byte, metaSpan.length)
Expand All @@ -136,7 +136,7 @@ func newArchiveMetadata(reader flexTableReaderAt, fileSize uint64) (*ArchiveMeta
}, nil
}

func newArchiveReaderFromFooter(reader flexTableReaderAt, fileSz uint64, footer []byte) (archiveReader, error) {
func newArchiveReaderFromFooter(ctx context.Context, reader tableReaderAt, fileSz uint64, footer []byte, stats *Stats) (archiveReader, error) {
if uint64(len(footer)) != archiveFooterSize {
return archiveReader{}, errors.New("runtime error: invalid footer.")
}
Expand All @@ -146,22 +146,21 @@ func newArchiveReaderFromFooter(reader flexTableReaderAt, fileSz uint64, footer
return archiveReader{}, err
}

return buildArchiveReader(reader, fileSz, ftr)
return buildArchiveReader(ctx, reader, ftr, stats)
}

func newArchiveReader(reader flexTableReaderAt, fileSize uint64) (archiveReader, error) {
footer, err := loadFooter(reader, fileSize)
func newArchiveReader(ctx context.Context, reader tableReaderAt, fileSize uint64, stats *Stats) (archiveReader, error) {
footer, err := loadFooter(ctx, reader, fileSize, stats)
if err != nil {
return archiveReader{}, err
}

return buildArchiveReader(reader, fileSize, footer)
return buildArchiveReader(ctx, reader, footer, stats)
}

func buildArchiveReader(reader flexTableReaderAt, fileSize uint64, footer archiveFooter) (archiveReader, error) {

func buildArchiveReader(ctx context.Context, reader tableReaderAt, footer archiveFooter, stats *Stats) (archiveReader, error) {
byteOffSpan := footer.indexByteOffsetSpan()
secRdr := io.NewSectionReader(reader, int64(byteOffSpan.offset), int64(byteOffSpan.length))
secRdr := newSectionReader(ctx, reader, int64(byteOffSpan.offset), int64(byteOffSpan.length), stats)
byteSpans := make([]uint64, footer.byteSpanCount+1)
byteSpans[0] = 0 // Null byteSpan to simplify logic.
err := binary.Read(secRdr, binary.BigEndian, byteSpans[1:])
Expand All @@ -170,23 +169,23 @@ func buildArchiveReader(reader flexTableReaderAt, fileSize uint64, footer archiv
}

prefixSpan := footer.indexPrefixSpan()
prefixRdr := io.NewSectionReader(reader, int64(prefixSpan.offset), int64(prefixSpan.length))
prefixRdr := newSectionReader(ctx, reader, int64(prefixSpan.offset), int64(prefixSpan.length), stats)
prefixes := make([]uint64, footer.chunkCount)
err = binary.Read(prefixRdr, binary.BigEndian, prefixes[:])
if err != nil {
return archiveReader{}, err
}

chunkRefSpan := footer.indexChunkRefSpan()
chunkRdr := io.NewSectionReader(reader, int64(chunkRefSpan.offset), int64(chunkRefSpan.length))
chunkRdr := newSectionReader(ctx, reader, int64(chunkRefSpan.offset), int64(chunkRefSpan.length), stats)
chunks := make([]uint32, footer.chunkCount*2)
err = binary.Read(chunkRdr, binary.BigEndian, chunks[:])
if err != nil {
return archiveReader{}, err
}

suffixSpan := footer.indexSuffixSpan()
sufRdr := io.NewSectionReader(reader, int64(suffixSpan.offset), int64(suffixSpan.length))
sufRdr := newSectionReader(ctx, reader, int64(suffixSpan.offset), int64(suffixSpan.length), stats)
suffixes := make([]byte, footer.chunkCount*hash.SuffixLen)
_, err = io.ReadFull(sufRdr, suffixes)
if err != nil {
Expand All @@ -211,21 +210,38 @@ func buildArchiveReader(reader flexTableReaderAt, fileSize uint64, footer archiv

// clone returns a new archiveReader with a new (provided) reader. All other fields are immutable or thread safe,
// so they are copied.
func (ar archiveReader) clone(newReader flexTableReaderAt) archiveReader {
func (ar archiveReader) clone() (archiveReader, error) {
reader, err := ar.reader.clone()
if err != nil {
return archiveReader{}, nil
}
return archiveReader{
reader: newReader,
reader: reader,
prefixes: ar.prefixes,
spanIndex: ar.spanIndex,
chunkRefs: ar.chunkRefs,
suffixes: ar.suffixes,
footer: ar.footer,
dictCache: ar.dictCache, // cache is thread safe.
}
}, nil
}

func loadFooter(reader io.ReaderAt, fileSize uint64) (f archiveFooter, err error) {
section := io.NewSectionReader(reader, int64(fileSize-archiveFooterSize), int64(archiveFooterSize))
type readerAtWithStatsBridge struct {
reader ReaderAtWithStats
ctx context.Context
stats *Stats
}

func (r readerAtWithStatsBridge) ReadAt(p []byte, off int64) (int, error) {
return r.reader.ReadAtWithStats(r.ctx, p, off, r.stats)
}

func newSectionReader(ctx context.Context, rd ReaderAtWithStats, off, len int64, stats *Stats) *io.SectionReader {
return io.NewSectionReader(readerAtWithStatsBridge{rd, ctx, stats}, off, len)
}

func loadFooter(ctx context.Context, reader ReaderAtWithStats, fileSize uint64, stats *Stats) (f archiveFooter, err error) {
section := newSectionReader(ctx, reader, int64(fileSize-archiveFooterSize), int64(archiveFooterSize), stats)
buf := make([]byte, archiveFooterSize)
_, err = io.ReadFull(section, buf)
if err != nil {
Expand Down Expand Up @@ -411,18 +427,18 @@ func (ar archiveReader) getMetadata(ctx context.Context, stats *Stats) ([]byte,

// verifyDataCheckSum verifies the checksum of the data section of the archive. Note - this requires a fully read of
// the data section, which could be sizable.
func (ar archiveReader) verifyDataCheckSum() error {
return verifyCheckSum(ar.reader, ar.footer.dataSpan(), ar.footer.dataCheckSum)
func (ar archiveReader) verifyDataCheckSum(ctx context.Context, stats *Stats) error {
return verifyCheckSum(ctx, ar.reader, ar.footer.dataSpan(), ar.footer.dataCheckSum, stats)
}

// verifyIndexCheckSum verifies the checksum of the index section of the archive.
func (ar archiveReader) verifyIndexCheckSum() error {
return verifyCheckSum(ar.reader, ar.footer.totalIndexSpan(), ar.footer.indexCheckSum)
func (ar archiveReader) verifyIndexCheckSum(ctx context.Context, stats *Stats) error {
return verifyCheckSum(ctx, ar.reader, ar.footer.totalIndexSpan(), ar.footer.indexCheckSum, stats)
}

// verifyMetaCheckSum verifies the checksum of the metadata section of the archive.
func (ar archiveReader) verifyMetaCheckSum() error {
return verifyCheckSum(ar.reader, ar.footer.metadataSpan(), ar.footer.metaCheckSum)
func (ar archiveReader) verifyMetaCheckSum(ctx context.Context, stats *Stats) error {
return verifyCheckSum(ctx, ar.reader, ar.footer.metadataSpan(), ar.footer.metaCheckSum, stats)
}

func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error, stats *Stats) error {
Expand All @@ -448,9 +464,9 @@ func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error
return nil
}

func verifyCheckSum(reader flexTableReaderAt, span byteSpan, checkSum sha512Sum) error {
func verifyCheckSum(ctx context.Context, reader tableReaderAt, span byteSpan, checkSum sha512Sum, stats *Stats) error {
hshr := sha512.New()
_, err := io.Copy(hshr, io.NewSectionReader(reader, int64(span.offset), int64(span.length)))
_, err := io.Copy(hshr, newSectionReader(ctx, reader, int64(span.offset), int64(span.length), stats))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 437b7b0

Please sign in to comment.