Skip to content

Commit

Permalink
cloudimpl: cache suffix in remote file sst wrapper
Browse files Browse the repository at this point in the history
Reading SSTs starts with multiple tiny reads in offsets near the end of the file.
If we can read that whole region once and fulfill those reads from a cached buffer,
we can avoid repeated RPCs.

Release note: none.
  • Loading branch information
dt committed Jan 27, 2021
1 parent 9e8ff9f commit 56ec8f0
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ var remoteSSTs = settings.RegisterBoolSetting(
true,
)

var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting(
"kv.bulk_ingest.stream_external_ssts.suffix_cache_size",
"size of suffix of remote SSTs to download and cache before reading from remote stream",
64<<10,
)

// commandMetadataEstimate is an estimate of how much metadata Raft will add to
// an AddSSTable command. It is intentionally a vast overestimate to avoid
// embedding intricate knowledge of the Raft encoding scheme here.
Expand Down Expand Up @@ -333,13 +339,21 @@ func ExternalSSTReader(
}

var reader sstable.ReadableFile = raw

if encryption != nil {
r, err := decryptingReader(raw, encryption.Key)
if err != nil {
f.Close()
return nil, err
}
reader = r
} else {
// We only explicitly buffer the suffix of the file when not decrypting as
// the decrypting reader has its own internal block buffer.
if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil {
f.Close()
return nil, err
}
}

iter, err := storage.NewSSTIterator(reader)
Expand All @@ -359,6 +373,15 @@ type sstReader struct {
pos int64

readPos int64 // readPos is used to transform Read() to ReadAt(readPos).

// This wrapper's primary purpose is reading SSTs which often perform many
// tiny reads in a cluster of offsets near the end of the file. If we can read
// the whole region once and fullfil those from a cache, we can avoid repeated
// RPCs.
cache struct {
pos int64
buf []byte
}
}

// Close implements io.Closer.
Expand All @@ -381,11 +404,49 @@ func (r *sstReader) Read(p []byte) (int, error) {
return n, err
}

// readAndCacheSuffix caches the `size` suffix of the file (which could the
// whole file) for use by later ReadAt calls to avoid making additional RPCs.
func (r *sstReader) readAndCacheSuffix(size int64) error {
if size == 0 {
return nil
}
r.cache.buf = nil
r.cache.pos = int64(r.sz) - size
if r.cache.pos <= 0 {
r.cache.pos = 0
}
reader, err := r.openAt(r.cache.pos)
if err != nil {
return err
}
defer reader.Close()
read, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
r.cache.buf = read
return nil
}

// ReadAt implements io.ReaderAt by opening a Reader at an offset before reading
// from it. Note: contrary to io.ReaderAt, ReadAt does *not* support parallel
// calls.
func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) {
var read int
if offset >= r.cache.pos && offset < r.cache.pos+int64(len(r.cache.buf)) {
read += copy(p, r.cache.buf[offset-r.cache.pos:])
if read == len(p) {
return read, nil
}
// Advance offset to end of what cache read.
offset += int64(read)
}

if offset == int64(r.sz) {
return read, io.EOF
}

// Position the underlying reader at offset if needed.
if r.pos != offset {
if err := r.Close(); err != nil {
return 0, err
Expand All @@ -397,6 +458,7 @@ func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) {
r.pos = offset
r.body = b
}

var err error
for n := 0; read < len(p); n, err = r.body.Read(p[read:]) {
read += n
Expand Down

0 comments on commit 56ec8f0

Please sign in to comment.