Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storageccl: use NewPebbleIterator in restore data processor #83984

Merged
merged 1 commit into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/backupinfo/backup_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func debugDumpFileSST(
}
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}
iter, err := storageccl.ExternalSSTReader(ctx, store, fileInfoPath, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, fileInfoPath, encOpts)
if err != nil {
return err
}
Expand Down Expand Up @@ -665,7 +665,7 @@ func DebugDumpMetadataSST(
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, path, encOpts)
if err != nil {
return err
}
Expand Down Expand Up @@ -805,7 +805,7 @@ func NewBackupMetadata(
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, exportStore, sstFileName, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, exportStore, sstFileName, encOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -922,7 +922,7 @@ func (b *BackupMetadata) FileIter(ctx context.Context) FileIterator {
break
}

iter, err := storageccl.ExternalSSTReader(ctx, b.store, path, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, b.store, path, encOpts)
if err != nil {
return FileIterator{err: err}
}
Expand Down Expand Up @@ -1232,7 +1232,7 @@ func makeBytesIter(
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, path, encOpts)
if err != nil {
return bytesIter{iterError: err}
}
Expand Down
42 changes: 19 additions & 23 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,37 +278,27 @@ func (rd *restoreDataProcessor) openSSTs(
) error {
ctxDone := ctx.Done()

// The sstables only contain MVCC data and no intents, so using an MVCC
// iterator is sufficient.
var iters []storage.SimpleMVCCIterator
// TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir
// in a given restore span entry
var dirs []cloud.ExternalStorage

// If we bail early and haven't handed off responsibility of the dirs/iters to
// the channel, close anything that we had open.
defer func() {
for _, iter := range iters {
iter.Close()
}

for _, dir := range dirs {
if err := dir.Close(); err != nil {
log.Warningf(ctx, "close export storage failed %v", err)
}
}
}()

// sendIters sends all of the currently accumulated iterators over the
// sendIter sends a multiplexed iterator covering the currently accumulated files over the
// channel.
sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
multiIter := storage.MakeMultiIterator(itersToSend)
readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime)
sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
readAsOfIter.Close()
multiIter.Close()
for _, iter := range itersToSend {
iter.Close()
}

for _, dir := range dirsToSend {
if err := dir.Close(); err != nil {
Expand All @@ -329,13 +319,13 @@ func (rd *restoreDataProcessor) openSSTs(
return ctx.Err()
}

iters = make([]storage.SimpleMVCCIterator, 0)
dirs = make([]cloud.ExternalStorage, 0)
return nil
}

log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey)

filePaths := make([]string, 0, len(EntryFiles{}))
for _, file := range entry.Files {
log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key)

Expand All @@ -344,17 +334,23 @@ func (rd *restoreDataProcessor) openSSTs(
return err
}
dirs = append(dirs, dir)
filePaths = append(filePaths, file.Path)

// TODO(pbardea): When memory monitoring is added, send the currently
// accumulated iterators on the channel if we run into memory pressure.
iter, err := storageccl.ExternalSSTReader(ctx, dir, file.Path, rd.spec.Encryption)
if err != nil {
return err
}
iters = append(iters, iter)
}

return sendIters(iters, dirs)
iterOpts := storage.IterOptions{
RangeKeyMaskingBelow: rd.spec.RestoreTime,
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
iter, err := storageccl.ExternalSSTReader(ctx, dirs, filePaths, rd.spec.Encryption,
iterOpts)
if err != nil {
return err
}
return sendIter(iter, dirs)
}

func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func makeIters(
return nil, nil, errors.Wrapf(err, "making external storage")
}

iters[i], err = storageccl.ExternalSSTReader(ctx, dirStorage[i], file.Path, nil)
iters[i], err = storageccl.DeprecatingExternalSSTReader(ctx, dirStorage[i], file.Path, nil)
if err != nil {
return nil, nil, errors.Wrapf(err, "fetching sst reader")
}
Expand Down
120 changes: 118 additions & 2 deletions pkg/ccl/storageccl/external_sst_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cockroachdb/pebble/sstable"
)

// RemoteSSTs lets external SSTables get iterated directly in some cases,
// rather than being downloaded entirely first.
var remoteSSTs = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.bulk_ingest.stream_external_ssts.enabled",
Expand All @@ -39,12 +41,126 @@ var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting(
64<<10,
)

// ExternalSSTReader returns opens an SST in external storage, optionally
// decrypting with the supplied parameters, and returns iterator over it.
func getFileWithRetry(
ctx context.Context, basename string, e cloud.ExternalStorage,
) (ioctx.ReadCloserCtx, int64, error) {
// Do an initial read of the file, from the beginning, to get the file size as
// this is used e.g. to read the trailer.
var f ioctx.ReadCloserCtx
var sz int64
const maxAttempts = 3
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error {
var err error
f, sz, err = e.ReadFileAt(ctx, basename, 0)
return err
}); err != nil {
return nil, 0, err
}
return f, sz, nil
}

// newMemPebbleSSTReader returns a PebbleSSTIterator for in-memory SSTs from
// external storage, optionally decrypting with the supplied parameters.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
func newMemPebbleSSTReader(
ctx context.Context,
e []cloud.ExternalStorage,
basenames []string,
encryption *roachpb.FileEncryptionOptions,
iterOps storage.IterOptions,
) (storage.SimpleMVCCIterator, error) {

inMemorySSTs := make([][]byte, 0, len(basenames))

for i, basename := range basenames {
f, _, err := getFileWithRetry(ctx, basename, e[i])
if err != nil {
return nil, err
}
content, err := ioctx.ReadAll(ctx, f)
f.Close(ctx)
if err != nil {
return nil, err
}
if encryption != nil {
content, err = DecryptFile(ctx, content, encryption.Key, nil /* mm */)
if err != nil {
return nil, err
}
}
inMemorySSTs = append(inMemorySSTs, content)
}
return storage.NewPebbleMultiMemSSTIterator(inMemorySSTs, false, iterOps)
}

// ExternalSSTReader returns a PebbleSSTIterator for the SSTs in external storage,
// optionally decrypting with the supplied parameters.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
func ExternalSSTReader(
ctx context.Context,
e []cloud.ExternalStorage,
basenames []string,
encryption *roachpb.FileEncryptionOptions,
iterOps storage.IterOptions,
) (storage.SimpleMVCCIterator, error) {
if !remoteSSTs.Get(&e[0].Settings().SV) {
return newMemPebbleSSTReader(ctx, e, basenames, encryption, iterOps)
}
remoteCacheSize := remoteSSTSuffixCacheSize.Get(&e[0].Settings().SV)
readers := make([]sstable.ReadableFile, 0, len(basenames))

for i, basename := range basenames {
f, sz, err := getFileWithRetry(ctx, basename, e[i])
if err != nil {
return nil, err
}

raw := &sstReader{
ctx: ctx,
sz: sizeStat(sz),
body: f,
openAt: func(offset int64) (ioctx.ReadCloserCtx, error) {
reader, _, err := e[i].ReadFileAt(ctx, basename, offset)
return reader, err
},
}

var reader sstable.ReadableFile

if encryption != nil {
r, err := decryptingReader(raw, encryption.Key)
if err != nil {
f.Close(ctx)
return nil, err
}
reader = r
} else {
// We only explicitly buffer the suffix of the file when not decrypting as
// the decrypting reader has its own internal block buffer.
if err := raw.readAndCacheSuffix(remoteCacheSize); err != nil {
f.Close(ctx)
return nil, err
}
reader = raw
}
readers = append(readers, reader)
}
return storage.NewPebbleSSTIterator(readers, iterOps)
}

// DeprecatingExternalSSTReader returns opens an SST in external storage, optionally
// decrypting with the supplied parameters, and returns iterator over it.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
//
// TODO (msbutler): replace all current calls with new ExternalSSTReader,
// as it does not handle range keys
func DeprecatingExternalSSTReader(
ctx context.Context,
e cloud.ExternalStorage,
basename string,
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/multi_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
const invalidIdxSentinel = -1

// multiIterator multiplexes iteration over a number of SimpleMVCCIterators.
//
// TODO (msbutler): remove the multiIterator and replace all uses with PebbleSSTIterator
type multiIterator struct {
iters []SimpleMVCCIterator
// The index into `iters` of the iterator currently being pointed at.
Expand Down