diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index ac074f8e023c..61d97da7e10f 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -618,7 +618,7 @@ func debugDumpFileSST( } encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, fileInfoPath, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, fileInfoPath, encOpts) if err != nil { return err } @@ -665,7 +665,7 @@ func DebugDumpMetadataSST( encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, path, encOpts) if err != nil { return err } @@ -805,7 +805,7 @@ func NewBackupMetadata( encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, exportStore, sstFileName, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, exportStore, sstFileName, encOpts) if err != nil { return nil, err } @@ -922,7 +922,7 @@ func (b *BackupMetadata) FileIter(ctx context.Context) FileIterator { break } - iter, err := storageccl.ExternalSSTReader(ctx, b.store, path, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, b.store, path, encOpts) if err != nil { return FileIterator{err: err} } @@ -1232,7 +1232,7 @@ func makeBytesIter( encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, path, encOpts) if err != nil { return bytesIter{iterError: err} } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 0c9ca8de4e06..00e62c7c4054 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -278,18 +278,13 @@ func (rd *restoreDataProcessor) openSSTs( ) error { ctxDone := ctx.Done() - // The sstables only contain MVCC data and no intents, so using an MVCC - // iterator is sufficient. - var iters []storage.SimpleMVCCIterator + // TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir + // in a given restore span entry var dirs []cloud.ExternalStorage // If we bail early and haven't handed off responsibility of the dirs/iters to // the channel, close anything that we had open. defer func() { - for _, iter := range iters { - iter.Close() - } - for _, dir := range dirs { if err := dir.Close(); err != nil { log.Warningf(ctx, "close export storage failed %v", err) @@ -297,18 +292,13 @@ func (rd *restoreDataProcessor) openSSTs( } }() - // sendIters sends all of the currently accumulated iterators over the + // sendIter sends a multiplexed iterator covering the currently accumulated files over the // channel. - sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { - multiIter := storage.MakeMultiIterator(itersToSend) - readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime) + sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { + readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { readAsOfIter.Close() - multiIter.Close() - for _, iter := range itersToSend { - iter.Close() - } for _, dir := range dirsToSend { if err := dir.Close(); err != nil { @@ -329,13 +319,13 @@ func (rd *restoreDataProcessor) openSSTs( return ctx.Err() } - iters = make([]storage.SimpleMVCCIterator, 0) dirs = make([]cloud.ExternalStorage, 0) return nil } log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) + filePaths := make([]string, 0, len(EntryFiles{})) for _, file := range entry.Files { log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) @@ -344,17 +334,23 @@ func (rd *restoreDataProcessor) openSSTs( return err } dirs = append(dirs, dir) + filePaths = append(filePaths, file.Path) // TODO(pbardea): When memory monitoring is added, send the currently // accumulated iterators on the channel if we run into memory pressure. - iter, err := storageccl.ExternalSSTReader(ctx, dir, file.Path, rd.spec.Encryption) - if err != nil { - return err - } - iters = append(iters, iter) } - - return sendIters(iters, dirs) + iterOpts := storage.IterOptions{ + RangeKeyMaskingBelow: rd.spec.RestoreTime, + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storageccl.ExternalSSTReader(ctx, dirs, filePaths, rd.spec.Encryption, + iterOpts) + if err != nil { + return err + } + return sendIter(iter, dirs) } func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 42459e3a5c32..74a9d7d63c9a 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -591,7 +591,7 @@ func makeIters( return nil, nil, errors.Wrapf(err, "making external storage") } - iters[i], err = storageccl.ExternalSSTReader(ctx, dirStorage[i], file.Path, nil) + iters[i], err = storageccl.DeprecatingExternalSSTReader(ctx, dirStorage[i], file.Path, nil) if err != nil { return nil, nil, errors.Wrapf(err, "fetching sst reader") } diff --git a/pkg/ccl/storageccl/external_sst_reader.go b/pkg/ccl/storageccl/external_sst_reader.go index 5d8e4f7dcc56..c5a8e73f7e1f 100644 --- a/pkg/ccl/storageccl/external_sst_reader.go +++ b/pkg/ccl/storageccl/external_sst_reader.go @@ -25,6 +25,8 @@ import ( "github.com/cockroachdb/pebble/sstable" ) +// RemoteSSTs lets external SSTables get iterated directly in some cases, +// rather than being downloaded entirely first. var remoteSSTs = settings.RegisterBoolSetting( settings.TenantWritable, "kv.bulk_ingest.stream_external_ssts.enabled", @@ -39,12 +41,126 @@ var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting( 64<<10, ) -// ExternalSSTReader returns opens an SST in external storage, optionally -// decrypting with the supplied parameters, and returns iterator over it. +func getFileWithRetry( + ctx context.Context, basename string, e cloud.ExternalStorage, +) (ioctx.ReadCloserCtx, int64, error) { + // Do an initial read of the file, from the beginning, to get the file size as + // this is used e.g. to read the trailer. + var f ioctx.ReadCloserCtx + var sz int64 + const maxAttempts = 3 + if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error { + var err error + f, sz, err = e.ReadFileAt(ctx, basename, 0) + return err + }); err != nil { + return nil, 0, err + } + return f, sz, nil +} + +// newMemPebbleSSTReader returns a PebbleSSTIterator for in-memory SSTs from +// external storage, optionally decrypting with the supplied parameters. +// +// ctx is captured and used throughout the life of the returned iterator, until +// the iterator's Close() method is called. +func newMemPebbleSSTReader( + ctx context.Context, + e []cloud.ExternalStorage, + basenames []string, + encryption *roachpb.FileEncryptionOptions, + iterOps storage.IterOptions, +) (storage.SimpleMVCCIterator, error) { + + inMemorySSTs := make([][]byte, 0, len(basenames)) + + for i, basename := range basenames { + f, _, err := getFileWithRetry(ctx, basename, e[i]) + if err != nil { + return nil, err + } + content, err := ioctx.ReadAll(ctx, f) + f.Close(ctx) + if err != nil { + return nil, err + } + if encryption != nil { + content, err = DecryptFile(ctx, content, encryption.Key, nil /* mm */) + if err != nil { + return nil, err + } + } + inMemorySSTs = append(inMemorySSTs, content) + } + return storage.NewPebbleMultiMemSSTIterator(inMemorySSTs, false, iterOps) +} + +// ExternalSSTReader returns a PebbleSSTIterator for the SSTs in external storage, +// optionally decrypting with the supplied parameters. // // ctx is captured and used throughout the life of the returned iterator, until // the iterator's Close() method is called. func ExternalSSTReader( + ctx context.Context, + e []cloud.ExternalStorage, + basenames []string, + encryption *roachpb.FileEncryptionOptions, + iterOps storage.IterOptions, +) (storage.SimpleMVCCIterator, error) { + if !remoteSSTs.Get(&e[0].Settings().SV) { + return newMemPebbleSSTReader(ctx, e, basenames, encryption, iterOps) + } + remoteCacheSize := remoteSSTSuffixCacheSize.Get(&e[0].Settings().SV) + readers := make([]sstable.ReadableFile, 0, len(basenames)) + + for i, basename := range basenames { + f, sz, err := getFileWithRetry(ctx, basename, e[i]) + if err != nil { + return nil, err + } + + raw := &sstReader{ + ctx: ctx, + sz: sizeStat(sz), + body: f, + openAt: func(offset int64) (ioctx.ReadCloserCtx, error) { + reader, _, err := e[i].ReadFileAt(ctx, basename, offset) + return reader, err + }, + } + + var reader sstable.ReadableFile + + if encryption != nil { + r, err := decryptingReader(raw, encryption.Key) + if err != nil { + f.Close(ctx) + return nil, err + } + reader = r + } else { + // We only explicitly buffer the suffix of the file when not decrypting as + // the decrypting reader has its own internal block buffer. + if err := raw.readAndCacheSuffix(remoteCacheSize); err != nil { + f.Close(ctx) + return nil, err + } + reader = raw + } + readers = append(readers, reader) + } + return storage.NewPebbleSSTIterator(readers, iterOps) +} + +// DeprecatingExternalSSTReader returns opens an SST in external storage, optionally +// decrypting with the supplied parameters, and returns iterator over it. +// +// ctx is captured and used throughout the life of the returned iterator, until +// the iterator's Close() method is called. +// +// TODO (msbutler): replace all current calls with new ExternalSSTReader, +// as it does not handle range keys +func DeprecatingExternalSSTReader( ctx context.Context, e cloud.ExternalStorage, basename string, diff --git a/pkg/storage/multi_iterator.go b/pkg/storage/multi_iterator.go index 5a4d248f12fa..0f4c2f00561a 100644 --- a/pkg/storage/multi_iterator.go +++ b/pkg/storage/multi_iterator.go @@ -20,6 +20,8 @@ import ( const invalidIdxSentinel = -1 // multiIterator multiplexes iteration over a number of SimpleMVCCIterators. +// +// TODO (msbutler): remove the multiIterator and replace all uses with PebbleSSTIterator type multiIterator struct { iters []SimpleMVCCIterator // The index into `iters` of the iterator currently being pointed at.