From e12b1f196064490ca0b6b41248575483795ed397 Mon Sep 17 00:00:00 2001 From: Rui Hu Date: Mon, 30 Jan 2023 15:02:56 -0500 Subject: [PATCH] backupccl: move descriptors and descriptor changes field from manifest to SST As part of an effort to make backup manifests scale better for larger clusters, this patch moves descriptors and descriptor changes from the manifest to an external SST. This avoids the need to alloc enough memory to hold every descriptor and descriptor revision for every layer of a backup during a backup or restore job. This patch also changes the access pattern for descriptors and descriptor changes to use iterators, so that they can be accessed in a streaming manner from the external SST. Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_job.go | 39 +- pkg/ccl/backupccl/backup_metadata_test.go | 109 ++--- pkg/ccl/backupccl/backup_planning.go | 45 +- pkg/ccl/backupccl/backup_test.go | 16 +- pkg/ccl/backupccl/backupinfo/BUILD.bazel | 19 +- .../backupccl/backupinfo/backup_metadata.go | 441 ++++++++++++------ .../backupccl/backupinfo/manifest_handling.go | 277 +++++++++-- .../backupinfo/manifest_handling_test.go | 300 ++++++++++++ pkg/ccl/backupccl/backuppb/backup.proto | 7 +- pkg/ccl/backupccl/bench_covering_test.go | 3 +- .../full_cluster_backup_restore_test.go | 3 +- .../generative_split_and_scatter_processor.go | 6 +- ...rative_split_and_scatter_processor_test.go | 17 +- pkg/ccl/backupccl/restore_job.go | 15 +- pkg/ccl/backupccl/restore_planning.go | 43 +- pkg/ccl/backupccl/restore_span_covering.go | 203 ++------ .../backupccl/restore_span_covering_test.go | 335 ++++++++++--- pkg/ccl/backupccl/show.go | 109 +++-- pkg/ccl/backupccl/targets.go | 54 ++- pkg/sql/execinfrapb/api.go | 1 + pkg/util/bulk/BUILD.bazel | 5 +- pkg/util/bulk/iterator.go | 46 ++ 23 files changed, 1517 insertions(+), 577 deletions(-) create mode 100644 pkg/ccl/backupccl/backupinfo/manifest_handling_test.go create mode 100644 pkg/util/bulk/iterator.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 178f54abe4c6..45527c8563e5 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -266,6 +266,7 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", + "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 4f04d3e4483e..bffabd0ded95 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -146,22 +146,26 @@ func backup( // TODO(benesch): verify these files, rather than accepting them as truth // blindly. // No concurrency yet, so these assignments are safe. - it, err := makeBackupManifestFileIterator(ctx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, - *backupManifest, encryption, &kmsEnv) + iterFactory := backupinfo.NewIterFactory(backupManifest, defaultStore, encryption, &kmsEnv) + it, err := iterFactory.NewFileIter(ctx) if err != nil { return roachpb.RowCount{}, err } - defer it.close() - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return roachpb.RowCount{}, err + } else if !ok { + break + } + + f := it.Value() if f.StartTime.IsEmpty() && !f.EndTime.IsEmpty() { completedIntroducedSpans = append(completedIntroducedSpans, f.Span) } else { completedSpans = append(completedSpans, f.Span) } } - if it.err() != nil { - return roachpb.RowCount{}, it.err() - } // Subtract out any completed spans. spans := filterSpans(backupManifest.Spans, completedSpans) @@ -340,13 +344,11 @@ func backup( // Write a `BACKUP_METADATA` file along with SSTs for all the alloc heavy // fields elided from the `BACKUP_MANIFEST`. - // - // TODO(adityamaru,rhu713): Once backup/restore switches from writing and - // reading backup manifests to `metadata.sst` we can stop writing the slim - // manifest. - if err := backupinfo.WriteFilesListMetadataWithSSTs(ctx, defaultStore, encryption, - &kmsEnv, backupManifest); err != nil { - return roachpb.RowCount{}, err + if backupinfo.WriteMetadataWithFilesSST.Get(&settings.SV) { + if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, defaultStore, encryption, + &kmsEnv, backupManifest); err != nil { + return roachpb.RowCount{}, err + } } var tableStatistics []*stats.TableStatisticProto @@ -921,12 +923,19 @@ func getBackupDetailAndManifest( return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err } + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, prevBackups, baseEncryptionOptions, &kmsEnv) + if err != nil { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err + } + backupManifest, err := createBackupManifest( ctx, execCfg, txn, updatedDetails, - prevBackups) + prevBackups, + layerToIterFactory, + ) if err != nil { return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err } diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backup_metadata_test.go index 60f736c85360..b15df4d52e69 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backup_metadata_test.go @@ -26,8 +26,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" - "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -36,6 +36,8 @@ import ( "github.com/stretchr/testify/require" ) +// TestMetadataSST has to be in backupccl_test in order to be sure that the +// BACKUP planhook is registered. func TestMetadataSST(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -112,14 +114,8 @@ func checkMetadata( } checkManifest(t, m, bm) - // If there are descriptor changes, we only check those as they should have - // all changes as well as existing descriptors - if len(m.DescriptorChanges) > 0 { - checkDescriptorChanges(ctx, t, m, bm) - } else { - checkDescriptors(ctx, t, m, bm) - } - + checkDescriptorChanges(ctx, t, m, bm) + checkDescriptors(ctx, t, m, bm) checkSpans(ctx, t, m, bm) // Don't check introduced spans on the first backup. if m.StartTime != (hlc.Timestamp{}) { @@ -147,16 +143,17 @@ func checkDescriptors( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaDescs []descpb.Descriptor - var desc descpb.Descriptor - it := bm.DescIter(ctx) + it := bm.NewDescIter(ctx) defer it.Close() - for it.Next(&desc) { - metaDescs = append(metaDescs, desc) - } + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } - if it.Err() != nil { - t.Fatal(it.Err()) + metaDescs = append(metaDescs, *it.Value()) } require.Equal(t, m.Descriptors, metaDescs) @@ -166,15 +163,16 @@ func checkDescriptorChanges( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaRevs []backuppb.BackupManifest_DescriptorRevision - var rev backuppb.BackupManifest_DescriptorRevision - it := bm.DescriptorChangesIter(ctx) + it := bm.NewDescriptorChangesIter(ctx) defer it.Close() - for it.Next(&rev) { - metaRevs = append(metaRevs, rev) - } - if it.Err() != nil { - t.Fatal(it.Err()) + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + metaRevs = append(metaRevs, *it.Value()) } // Descriptor Changes are sorted by time in the manifest. @@ -214,15 +212,17 @@ func checkSpans( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaSpans []roachpb.Span - var span roachpb.Span - it := bm.SpanIter(ctx) + it := bm.NewSpanIter(ctx) defer it.Close() - for it.Next(&span) { - metaSpans = append(metaSpans, span) - } - if it.Err() != nil { - t.Fatal(it.Err()) + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + metaSpans = append(metaSpans, it.Value()) } require.Equal(t, m.Spans, metaSpans) @@ -232,14 +232,16 @@ func checkIntroducedSpans( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaSpans []roachpb.Span - var span roachpb.Span - it := bm.IntroducedSpanIter(ctx) + it := bm.NewIntroducedSpanIter(ctx) defer it.Close() - for it.Next(&span) { - metaSpans = append(metaSpans, span) - } - if it.Err() != nil { - t.Fatal(it.Err()) + + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + metaSpans = append(metaSpans, it.Value()) } require.Equal(t, m.IntroducedSpans, metaSpans) @@ -249,15 +251,17 @@ func checkTenants( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaTenants []descpb.TenantInfoWithUsage - var tenant descpb.TenantInfoWithUsage - it := bm.TenantIter(ctx) + it := bm.NewTenantIter(ctx) defer it.Close() - for it.Next(&tenant) { - metaTenants = append(metaTenants, tenant) - } - if it.Err() != nil { - t.Fatal(it.Err()) + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + metaTenants = append(metaTenants, it.Value()) } require.Equal(t, m.Tenants, metaTenants) @@ -275,18 +279,17 @@ func checkStats( if err != nil { t.Fatal(err) } + if len(expectedStats) == 0 { + expectedStats = nil + } - var metaStats = make([]*stats.TableStatisticProto, 0) - var s *stats.TableStatisticProto - it := bm.StatsIter(ctx) + it := bm.NewStatsIter(ctx) defer it.Close() - - for it.Next(&s) { - metaStats = append(metaStats, s) - } - if it.Err() != nil { - t.Fatal(it.Err()) + metaStats, err := bulk.CollectToSlice(it) + if err != nil { + t.Fatal(err) } + require.Equal(t, expectedStats, metaStats) } diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 3f1161a8efa0..33041ec6944e 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1053,6 +1053,7 @@ func getReintroducedSpans( ctx context.Context, execCfg *sql.ExecutorConfig, prevBackups []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, tables []catalog.TableDescriptor, revs []backuppb.BackupManifest_DescriptorRevision, endTime hlc.Timestamp, @@ -1078,12 +1079,21 @@ func getReintroducedSpans( // at backup time, we must find all tables in manifest.DescriptorChanges whose // last change brought the table offline. offlineInLastBackup := make(map[descpb.ID]struct{}) - lastBackup := prevBackups[len(prevBackups)-1] + lastIterFactory := layerToIterFactory[len(prevBackups)-1] + + descIt := lastIterFactory.NewDescIter(ctx) + defer descIt.Close() + + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return nil, err + } else if !ok { + break + } - for _, desc := range lastBackup.Descriptors { // TODO(pbardea): Also check that lastWriteTime is set once those are // populated on the table descriptor. - if table, _, _, _, _ := descpb.GetDescriptors(&desc); table != nil && table.Offline() { + if table, _, _, _, _ := descpb.GetDescriptors(descIt.Value()); table != nil && table.Offline() { offlineInLastBackup[table.GetID()] = struct{}{} } } @@ -1093,8 +1103,16 @@ func getReintroducedSpans( // change in the previous backup interval put the table offline, then that // backup was offline at the endTime of the last backup. latestTableDescChangeInLastBackup := make(map[descpb.ID]*descpb.TableDescriptor) - for _, rev := range lastBackup.DescriptorChanges { - if table, _, _, _, _ := descpb.GetDescriptors(rev.Desc); table != nil { + descRevIt := lastIterFactory.NewDescriptorChangesIter(ctx) + defer descRevIt.Close() + for ; ; descRevIt.Next() { + if ok, err := descRevIt.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + if table, _, _, _, _ := descpb.GetDescriptors(descRevIt.Value().Desc); table != nil { if trackedRev, ok := latestTableDescChangeInLastBackup[table.GetID()]; !ok { latestTableDescChangeInLastBackup[table.GetID()] = table } else if trackedRev.Version < table.Version { @@ -1341,6 +1359,7 @@ func createBackupManifest( txn *kv.Txn, jobDetails jobspb.BackupDetails, prevBackups []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, ) (backuppb.BackupManifest, error) { mvccFilter := backuppb.MVCCFilter_Latest if jobDetails.RevisionHistory { @@ -1414,9 +1433,17 @@ func createBackupManifest( if len(prevBackups) > 0 { tablesInPrev := make(map[descpb.ID]struct{}) dbsInPrev := make(map[descpb.ID]struct{}) - rawDescs := prevBackups[len(prevBackups)-1].Descriptors - for i := range rawDescs { - if t, _, _, _, _ := descpb.GetDescriptors(&rawDescs[i]); t != nil { + + descIt := layerToIterFactory[len(prevBackups)-1].NewDescIter(ctx) + defer descIt.Close() + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return backuppb.BackupManifest{}, err + } else if !ok { + break + } + + if t, _, _, _, _ := descpb.GetDescriptors(descIt.Value()); t != nil { tablesInPrev[t.ID] = struct{}{} } } @@ -1437,7 +1464,7 @@ func createBackupManifest( newSpans = filterSpans(spans, prevBackups[len(prevBackups)-1].Spans) - reintroducedSpans, err = getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime) + reintroducedSpans, err = getReintroducedSpans(ctx, execCfg, prevBackups, layerToIterFactory, tables, revs, endTime) if err != nil { return backuppb.BackupManifest{}, err } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 6fa6318d3a6a..be7ffd603db0 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -8132,8 +8132,8 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) { } // TestIncorrectAccessOfFilesInBackupMetadata ensures that an accidental use of -// the `Files` field (instead of its dedicated SST) on the `BACKUP_METADATA` -// results in an error on restore and show. +// the `Descriptors` field (instead of its dedicated SST) on the +// `BACKUP_METADATA` results in an error on restore and show. func TestIncorrectAccessOfFilesInBackupMetadata(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -8158,13 +8158,13 @@ func TestIncorrectAccessOfFilesInBackupMetadata(t *testing.T) { var backupManifest backuppb.BackupManifest require.NoError(t, protoutil.Unmarshal(manifestData, &backupManifest)) - // The manifest should have `HasExternalFilesList` set to true. - require.True(t, backupManifest.HasExternalFilesList) + // The manifest should have `HasExternalManifestSSTs` set to true. + require.True(t, backupManifest.HasExternalManifestSSTs) // Set it to false, so that any operation that resolves the metadata treats // this manifest as a pre-23.1 BACKUP_MANIFEST, and directly accesses the - // `Files` field, instead of reading from the external SST. - backupManifest.HasExternalFilesList = false + // `Descriptors` field, instead of reading from the external SST. + backupManifest.HasExternalManifestSSTs = false manifestData, err = protoutil.Marshal(&backupManifest) require.NoError(t, err) require.NoError(t, os.WriteFile(manifestPath, manifestData, 0644 /* perm */)) @@ -8174,7 +8174,7 @@ func TestIncorrectAccessOfFilesInBackupMetadata(t *testing.T) { require.NoError(t, os.WriteFile(manifestPath+backupinfo.BackupManifestChecksumSuffix, checksum, 0644 /* perm */)) // Expect an error on restore. - sqlDB.ExpectErr(t, "assertion: this placeholder legacy Files entry should never be opened", `RESTORE DATABASE r1 FROM LATEST IN 'nodelocal://0/test' WITH new_db_name = 'r2'`) + sqlDB.ExpectErr(t, "assertion: this placeholder legacy Descriptor entry should never be used", `RESTORE DATABASE r1 FROM LATEST IN 'nodelocal://0/test' WITH new_db_name = 'r2'`) } func TestManifestTooNew(t *testing.T) { @@ -8285,7 +8285,7 @@ func flipBitInManifests(t *testing.T, rawDir string) { foundManifest := false err := filepath.Walk(rawDir, func(path string, info os.FileInfo, err error) error { log.Infof(context.Background(), "visiting %s", path) - if filepath.Base(path) == backupbase.BackupMetadataName { + if filepath.Base(path) == backupbase.BackupManifestName { foundManifest = true data, err := os.ReadFile(path) require.NoError(t, err) diff --git a/pkg/ccl/backupccl/backupinfo/BUILD.bazel b/pkg/ccl/backupccl/backupinfo/BUILD.bazel index 2bf6b1cb2bd6..c642036cfba7 100644 --- a/pkg/ccl/backupccl/backupinfo/BUILD.bazel +++ b/pkg/ccl/backupccl/backupinfo/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//pkg/sql/stats", "//pkg/storage", "//pkg/util", + "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", @@ -55,17 +56,33 @@ go_library( go_test( name = "backupinfo_test", - srcs = ["main_test.go"], + srcs = [ + "main_test.go", + "manifest_handling_test.go", + ], args = ["-test.timeout=295s"], embed = [":backupinfo"], deps = [ + "//pkg/base", + "//pkg/blobs", + "//pkg/ccl/backupccl/backuppb", "//pkg/ccl/utilccl", + "//pkg/cloud", + "//pkg/cloud/impl:cloudimpl", + "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", + "//pkg/sql", + "//pkg/sql/catalog/descpb", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", + "//pkg/util/bulk", + "//pkg/util/hlc", + "//pkg/util/leaktest", "//pkg/util/randutil", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index 9b45d71ab3bb..a6a136e5d933 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -41,6 +42,11 @@ const ( // BackupManifest_Files of the backup. This file is always written in // conjunction with the `BACKUP_METADATA`. BackupMetadataFilesListPath = "filelist.sst" + // BackupMetadataDescriptorsListPath is the name of the SST file containing + // the BackupManifest_Descriptors or BackupManifest_DescriptorRevisions of the + // backup. This file is always written in conjunction with the + // `BACKUP_METADATA`. + BackupMetadataDescriptorsListPath = "descriptorslist.sst" // FileInfoPath is the name of the SST file containing the // BackupManifest_Files of the backup. FileInfoPath = "fileinfo.sst" @@ -200,18 +206,25 @@ func writeManifestToMetadata( return sst.PutUnversioned(roachpb.Key(sstBackupKey), b) } +// DescChangesLess gives an ordering to two BackupManifest_DescriptorRevision. +func DescChangesLess( + left *backuppb.BackupManifest_DescriptorRevision, + right *backuppb.BackupManifest_DescriptorRevision, +) bool { + if left.ID != right.ID { + return left.ID < right.ID + } + + return !left.Time.Less(right.Time) +} + func writeDescsToMetadata( ctx context.Context, sst storage.SSTWriter, m *backuppb.BackupManifest, ) error { // Add descriptors from revisions if available, Descriptors if not. if len(m.DescriptorChanges) > 0 { sort.Slice(m.DescriptorChanges, func(i, j int) bool { - if m.DescriptorChanges[i].ID < m.DescriptorChanges[j].ID { - return true - } else if m.DescriptorChanges[i].ID == m.DescriptorChanges[j].ID { - return !m.DescriptorChanges[i].Time.Less(m.DescriptorChanges[j].Time) - } - return false + return DescChangesLess(&m.DescriptorChanges[i], &m.DescriptorChanges[j]) }) for _, i := range m.DescriptorChanges { k := encodeDescSSTKey(i.ID) @@ -248,7 +261,7 @@ func writeDescsToMetadata( // changes in an incremental backup, it's helpful to have existing // descriptors at the start time, so we don't have to look back further // than the very last backup. - if m.StartTime.IsEmpty() { + if m.StartTime.IsEmpty() || m.MVCCFilter == backuppb.MVCCFilter_Latest { if err := sst.PutUnversioned(k, b); err != nil { return err } @@ -262,6 +275,39 @@ func writeDescsToMetadata( return nil } +// WriteDescsSST is responsible for writing the SST containing the Descriptor +// and DescriptorChanges field of the input BackupManifest. If DescriptorChanges +// is non-empty, then the descriptor changes will be written to the SST with the +// MVCC timestamp equal to the revision time. Otherwise, contents of the +// Descriptors field will be written to the SST with an empty MVCC timestamp. +func WriteDescsSST( + ctx context.Context, + m *backuppb.BackupManifest, + dest cloud.ExternalStorage, + enc *jobspb.BackupEncryptionOptions, + kmsEnv cloud.KMSEnv, + path string, +) error { + w, err := makeWriter(ctx, dest, path, enc, kmsEnv) + if err != nil { + return err + } + defer w.Close() + descSST := storage.MakeBackupSSTWriter(ctx, dest.Settings(), w) + defer descSST.Close() + + if err := writeDescsToMetadata(ctx, descSST, m); err != nil { + return err + } + + if err := descSST.Finish(); err != nil { + return err + } + + return w.Close() +} + +// FileCmp gives an ordering to two backuppb.BackupManifest_File. func FileCmp(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) int { if cmp := left.Span.Key.Compare(right.Span.Key); cmp != 0 { return cmp @@ -931,29 +977,34 @@ func NewBackupMetadata( type SpanIterator struct { backing bytesIter filter func(key storage.MVCCKey) bool + value *roachpb.Span err error } -// SpanIter creates a new SpanIterator for the backup metadata. -func (b *BackupMetadata) SpanIter(ctx context.Context) SpanIterator { +// NewSpanIter creates a new SpanIterator for the backup metadata. +func (b *BackupMetadata) NewSpanIter(ctx context.Context) bulk.Iterator[roachpb.Span] { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstSpansPrefix), b.enc, true, b.kmsEnv) - return SpanIterator{ + it := SpanIterator{ backing: backing, } + it.Next() + return &it } -// IntroducedSpanIter creates a new IntroducedSpanIterator for the backup metadata. -func (b *BackupMetadata) IntroducedSpanIter(ctx context.Context) SpanIterator { +// NewIntroducedSpanIter creates a new IntroducedSpanIterator for the backup metadata. +func (b *BackupMetadata) NewIntroducedSpanIter(ctx context.Context) bulk.Iterator[roachpb.Span] { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstSpansPrefix), b.enc, false, b.kmsEnv) - return SpanIterator{ + it := SpanIterator{ backing: backing, filter: func(key storage.MVCCKey) bool { return key.Timestamp == hlc.Timestamp{} }, } + it.Next() + return &it } // Close closes the iterator. @@ -961,37 +1012,41 @@ func (si *SpanIterator) Close() { si.backing.close() } -// Err returns the iterator's error -func (si *SpanIterator) Err() error { +// Valid implements the Iterator interface. +func (si *SpanIterator) Valid() (bool, error) { if si.err != nil { - return si.err + return false, si.err } - return si.backing.err() + return si.value != nil, si.err } -// Next retrieves the next span in the iterator. -// -// Next returns true if next element was successfully unmarshalled into span, -// and false if there are no more elements or if an error was encountered. When -// Next returns false, the user should call the Err method to verify the -// existence of an error. -func (si *SpanIterator) Next(span *roachpb.Span) bool { +// Value implements the Iterator interface. +func (si *SpanIterator) Value() roachpb.Span { + if si.value == nil { + return roachpb.Span{} + } + return *si.value +} + +// Next implements the Iterator interface. +func (si *SpanIterator) Next() { wrapper := resultWrapper{} + var nextSpan *roachpb.Span for si.backing.next(&wrapper) { if si.filter == nil || si.filter(wrapper.key) { sp, err := decodeSpanSSTKey(wrapper.key.Key) if err != nil { si.err = err - return false + return } - *span = sp - return true + nextSpan = &sp + break } } - return false + si.value = nextSpan } // FileIterator is a simple iterator to iterate over backuppb.BackupManifest_File. @@ -1002,7 +1057,9 @@ type FileIterator struct { } // NewFileIter creates a new FileIterator for the backup metadata. -func (b *BackupMetadata) NewFileIter(ctx context.Context) (*FileIterator, error) { +func (b *BackupMetadata) NewFileIter( + ctx context.Context, +) (bulk.Iterator[*backuppb.BackupManifest_File], error) { fileInfoIter := makeBytesIter(ctx, b.store, b.filename, []byte(sstFilesPrefix), b.enc, false, b.kmsEnv) defer fileInfoIter.close() @@ -1030,13 +1087,7 @@ func (b *BackupMetadata) NewFileIter(ctx context.Context) (*FileIterator, error) if fileInfoIter.err() != nil { return nil, fileInfoIter.err() } - - iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, encOpts, iterOpts) - if err != nil { - return nil, err - } - iter.SeekGE(storage.MVCCKey{}) - return &FileIterator{mergedIterator: iter}, nil + return newFileSSTIter(ctx, storeFiles, encOpts) } // NewFileSSTIter creates a new FileIterator to iterate over the storeFile. @@ -1044,12 +1095,20 @@ func (b *BackupMetadata) NewFileIter(ctx context.Context) (*FileIterator, error) func NewFileSSTIter( ctx context.Context, storeFile storageccl.StoreFile, encOpts *roachpb.FileEncryptionOptions, ) (*FileIterator, error) { - iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{storeFile}, encOpts, iterOpts) + return newFileSSTIter(ctx, []storageccl.StoreFile{storeFile}, encOpts) +} + +func newFileSSTIter( + ctx context.Context, storeFiles []storageccl.StoreFile, encOpts *roachpb.FileEncryptionOptions, +) (*FileIterator, error) { + iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, encOpts, iterOpts) if err != nil { return nil, err } iter.SeekGE(storage.MVCCKey{}) - return &FileIterator{mergedIterator: iter}, nil + fi := &FileIterator{mergedIterator: iter} + fi.Next() + return fi, nil } // Close closes the iterator. @@ -1063,55 +1122,55 @@ func (fi *FileIterator) Valid() (bool, error) { return false, fi.err } - if ok, err := fi.mergedIterator.Valid(); !ok { - fi.err = err - return ok, err + return fi.file != nil, nil +} + +// Value implements the Iterator interface. +func (fi *FileIterator) Value() *backuppb.BackupManifest_File { + return fi.file +} + +// Next implements the Iterator interface. +func (fi *FileIterator) Next() { + if fi.err != nil { + return } - if fi.file == nil { - v := fi.mergedIterator.UnsafeValue() - file := &backuppb.BackupManifest_File{} - err := protoutil.Unmarshal(v, file) + if ok, err := fi.mergedIterator.Valid(); !ok { if err != nil { fi.err = err - return false, fi.err } - fi.file = file + fi.file = nil + return } - return true, nil -} -// Value returns the current value of the iterator, if valid. -func (fi *FileIterator) Value() *backuppb.BackupManifest_File { - return fi.file -} + v := fi.mergedIterator.UnsafeValue() + file := &backuppb.BackupManifest_File{} + err := protoutil.Unmarshal(v, file) + if err != nil { + fi.err = err + return + } -// Next advances the iterator the the next value. -func (fi *FileIterator) Next() { + fi.file = file fi.mergedIterator.Next() - fi.file = nil -} - -// Reset resets the iterator to the first value. -func (fi *FileIterator) Reset() { - fi.mergedIterator.SeekGE(storage.MVCCKey{}) - fi.err = nil - fi.file = nil } // DescIterator is a simple iterator to iterate over descpb.Descriptors. type DescIterator struct { backing bytesIter + value *descpb.Descriptor err error } -// DescIter creates a new DescIterator for the backup metadata. -func (b *BackupMetadata) DescIter(ctx context.Context) DescIterator { - backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstDescsPrefix), b.enc, - true, b.kmsEnv) - return DescIterator{ +// NewDescIter creates a new DescIterator for the backup metadata. +func (b *BackupMetadata) NewDescIter(ctx context.Context) bulk.Iterator[*descpb.Descriptor] { + backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstDescsPrefix), b.enc, true, b.kmsEnv) + it := DescIterator{ backing: backing, } + it.Next() + return &it } // Close closes the iterator. @@ -1119,52 +1178,63 @@ func (di *DescIterator) Close() { di.backing.close() } -// Err returns the iterator's error. -func (di *DescIterator) Err() error { +// Valid implements the Iterator interface. +func (di *DescIterator) Valid() (bool, error) { if di.err != nil { - return di.err + return false, di.err } - return di.backing.err() + return di.value != nil, nil } -// Next retrieves the next descriptor in the iterator. -// -// Next returns true if next element was successfully unmarshalled into desc , -// and false if there are no more elements or if an error was encountered. When -// Next returns false, the user should call the Err method to verify the -// existence of an error. -func (di *DescIterator) Next(desc *descpb.Descriptor) bool { - wrapper := resultWrapper{} +// Value implements the Iterator interface. +func (di *DescIterator) Value() *descpb.Descriptor { + return di.value +} + +// Next implements the Iterator interface. +func (di *DescIterator) Next() { + if di.err != nil { + return + } + wrapper := resultWrapper{} + var nextValue *descpb.Descriptor + descHolder := descpb.Descriptor{} for di.backing.next(&wrapper) { - err := protoutil.Unmarshal(wrapper.value, desc) + err := protoutil.Unmarshal(wrapper.value, &descHolder) if err != nil { di.err = err - return false + return } - tbl, db, typ, sc, fn := descpb.GetDescriptors(desc) + tbl, db, typ, sc, fn := descpb.GetDescriptors(&descHolder) if tbl != nil || db != nil || typ != nil || sc != nil || fn != nil { - return true + nextValue = &descHolder + break } } - return false + di.value = nextValue } // TenantIterator is a simple iterator to iterate over TenantInfoWithUsages. type TenantIterator struct { backing bytesIter + value *descpb.TenantInfoWithUsage err error } -// TenantIter creates a new TenantIterator for the backup metadata. -func (b *BackupMetadata) TenantIter(ctx context.Context) TenantIterator { +// NewTenantIter creates a new TenantIterator for the backup metadata. +func (b *BackupMetadata) NewTenantIter( + ctx context.Context, +) bulk.Iterator[descpb.TenantInfoWithUsage] { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstTenantsPrefix), b.enc, false, b.kmsEnv) - return TenantIterator{ + it := TenantIterator{ backing: backing, } + it.Next() + return &it } // Close closes the iterator. @@ -1172,62 +1242,91 @@ func (ti *TenantIterator) Close() { ti.backing.close() } -// Err returns the iterator's error. -func (ti *TenantIterator) Err() error { +// Valid implements the Iterator interface. +func (ti *TenantIterator) Valid() (bool, error) { if ti.err != nil { - return ti.err + return false, ti.err } - return ti.backing.err() + return ti.value != nil, nil } -// Next retrieves the next tenant in the iterator. -// -// Next returns true if next element was successfully unmarshalled into tenant, -// and false if there are no more elements or if an error was encountered. When -// Next returns false, the user should call the Err method to verify the -// existence of an error. -func (ti *TenantIterator) Next(tenant *descpb.TenantInfoWithUsage) bool { +// Value implements the Iterator interface. +func (ti *TenantIterator) Value() descpb.TenantInfoWithUsage { + if ti.value == nil { + return descpb.TenantInfoWithUsage{} + } + return *ti.value +} + +// Next implements the Iterator interface. +func (ti *TenantIterator) Next() { + if ti.err != nil { + return + } + wrapper := resultWrapper{} ok := ti.backing.next(&wrapper) if !ok { - return false + if ti.backing.err() != nil { + ti.err = ti.backing.err() + } + ti.value = nil + return } - err := protoutil.Unmarshal(wrapper.value, tenant) + tenant := descpb.TenantInfoWithUsage{} + + err := protoutil.Unmarshal(wrapper.value, &tenant) if err != nil { ti.err = err - return false + return } - return true + ti.value = &tenant } // DescriptorRevisionIterator is a simple iterator to iterate over backuppb.BackupManifest_DescriptorRevisions. type DescriptorRevisionIterator struct { backing bytesIter err error + value *backuppb.BackupManifest_DescriptorRevision } -// DescriptorChangesIter creates a new DescriptorChangesIterator for the backup metadata. -func (b *BackupMetadata) DescriptorChangesIter(ctx context.Context) DescriptorRevisionIterator { +// NewDescriptorChangesIter creates a new DescriptorChangesIterator for the backup metadata. +func (b *BackupMetadata) NewDescriptorChangesIter( + ctx context.Context, +) bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] { + if b.MVCCFilter == backuppb.MVCCFilter_Latest { + var backing []backuppb.BackupManifest_DescriptorRevision + return newSlicePointerIterator(backing) + } + backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstDescsPrefix), b.enc, false, b.kmsEnv) - return DescriptorRevisionIterator{ + dri := DescriptorRevisionIterator{ backing: backing, } -} -// Close closes the iterator. -func (dri *DescriptorRevisionIterator) Close() { - dri.backing.close() + dri.Next() + return &dri } -// Err returns the iterator's error. -func (dri *DescriptorRevisionIterator) Err() error { +// Valid implements the Iterator interface. +func (dri *DescriptorRevisionIterator) Valid() (bool, error) { if dri.err != nil { - return dri.err + return false, dri.err } - return dri.backing.err() + return dri.value != nil, nil +} + +// Value implements the Iterator interface. +func (dri *DescriptorRevisionIterator) Value() *backuppb.BackupManifest_DescriptorRevision { + return dri.value +} + +// Close closes the iterator. +func (dri *DescriptorRevisionIterator) Close() { + dri.backing.close() } // Next retrieves the next descriptor revision in the iterator. @@ -1236,62 +1335,72 @@ func (dri *DescriptorRevisionIterator) Err() error { // revision, and false if there are no more elements or if an error was // encountered. When Next returns false, the user should call the Err method to // verify the existence of an error. -func (dri *DescriptorRevisionIterator) Next( - revision *backuppb.BackupManifest_DescriptorRevision, -) bool { +func (dri *DescriptorRevisionIterator) Next() { + if dri.err != nil { + return + } + wrapper := resultWrapper{} ok := dri.backing.next(&wrapper) if !ok { - return false + if err := dri.backing.err(); err != nil { + dri.err = err + } + + dri.value = nil + return } - err := unmarshalWrapper(&wrapper, revision) + nextRev, err := unmarshalWrapper(&wrapper) if err != nil { dri.err = err - return false + return } - return true + dri.value = &nextRev } -func unmarshalWrapper( - wrapper *resultWrapper, rev *backuppb.BackupManifest_DescriptorRevision, -) error { +func unmarshalWrapper(wrapper *resultWrapper) (backuppb.BackupManifest_DescriptorRevision, error) { var desc *descpb.Descriptor if len(wrapper.value) > 0 { desc = &descpb.Descriptor{} err := protoutil.Unmarshal(wrapper.value, desc) if err != nil { - return err + return backuppb.BackupManifest_DescriptorRevision{}, err } } id, err := decodeDescSSTKey(wrapper.key.Key) if err != nil { - return err + return backuppb.BackupManifest_DescriptorRevision{}, err } - *rev = backuppb.BackupManifest_DescriptorRevision{ + rev := backuppb.BackupManifest_DescriptorRevision{ Desc: desc, ID: id, Time: wrapper.key.Timestamp, } - return nil + return rev, nil } // StatsIterator is a simple iterator to iterate over stats.TableStatisticProtos. type StatsIterator struct { backing bytesIter + value *stats.TableStatisticProto err error } -// StatsIter creates a new StatsIterator for the backup metadata. -func (b *BackupMetadata) StatsIter(ctx context.Context) StatsIterator { +// NewStatsIter creates a new StatsIterator for the backup metadata. +func (b *BackupMetadata) NewStatsIter( + ctx context.Context, +) bulk.Iterator[*stats.TableStatisticProto] { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstStatsPrefix), b.enc, false, b.kmsEnv) - return StatsIterator{ + it := StatsIterator{ backing: backing, } + it.Next() + return &it } // Close closes the iterator. @@ -1299,37 +1408,44 @@ func (si *StatsIterator) Close() { si.backing.close() } -// Err returns the iterator's error. -func (si *StatsIterator) Err() error { +// Valid implements the Iterator interface. +func (si *StatsIterator) Valid() (bool, error) { if si.err != nil { - return si.err + return false, si.err } - return si.backing.err() + return si.value != nil, nil } -// Next retrieves the next stats proto in the iterator. -// -// Next returns true if next element was successfully unmarshalled into -// statsPtr, and false if there are no more elements or if an error was -// encountered. When Next returns false, the user should call the Err method to verify the -// existence of an error. -func (si *StatsIterator) Next(statsPtr **stats.TableStatisticProto) bool { +// Value implements the Iterator interface. +func (si *StatsIterator) Value() *stats.TableStatisticProto { + return si.value +} + +// Next implements the Iterator interface. +func (si *StatsIterator) Next() { + if si.err != nil { + return + } + wrapper := resultWrapper{} ok := si.backing.next(&wrapper) if !ok { - return false + if err := si.backing.err(); err != nil { + si.err = err + } + si.value = nil + return } var s stats.TableStatisticProto err := protoutil.Unmarshal(wrapper.value, &s) if err != nil { si.err = err - return false + return } - *statsPtr = &s - return true + si.value = &s } type bytesIter struct { @@ -1379,7 +1495,6 @@ func (bi *bytesIter) next(resWrapper *resultWrapper) bool { valid, err := bi.Iter.Valid() if err != nil || !valid || !bytes.HasPrefix(bi.Iter.UnsafeKey().Key, bi.prefix) { - bi.close() bi.iterError = err return false } @@ -1413,3 +1528,35 @@ type resultWrapper struct { key storage.MVCCKey value []byte } + +type sliceIterator[T any] struct { + backingSlice []T + idx int +} + +var _ bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] = &sliceIterator[backuppb.BackupManifest_DescriptorRevision]{} + +func newSlicePointerIterator[T any](backing []T) *sliceIterator[T] { + return &sliceIterator[T]{ + backingSlice: backing, + } +} + +func (s *sliceIterator[T]) Valid() (bool, error) { + return s.idx < len(s.backingSlice), nil +} + +func (s *sliceIterator[T]) Value() *T { + if s.idx < len(s.backingSlice) { + return &s.backingSlice[s.idx] + } + + return nil +} + +func (s *sliceIterator[T]) Next() { + s.idx++ +} + +func (s *sliceIterator[T]) Close() { +} diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling.go b/pkg/ccl/backupccl/backupinfo/manifest_handling.go index e43a178899a8..ca5e30497b3a 100644 --- a/pkg/ccl/backupccl/backupinfo/manifest_handling.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling.go @@ -15,7 +15,6 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "github.com/cockroachdb/cockroach/pkg/util" "path" "sort" "strconv" @@ -45,6 +44,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/stats" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -178,7 +179,7 @@ func ReadBackupManifestFromStore( backupbase.BackupManifestName, encryption, kmsEnv) if backupManifestErr != nil { if !errors.Is(backupManifestErr, cloud.ErrFileDoesNotExist) { - return backuppb.BackupManifest{}, 0, err + return backuppb.BackupManifest{}, 0, backupManifestErr } // If we did not find a `BACKUP_MANIFEST` we look for a `BACKUP` file as @@ -190,11 +191,11 @@ func ReadBackupManifestFromStore( backupbase.BackupOldManifestName, encryption, kmsEnv) if oldBackupManifestErr != nil { return backuppb.BackupManifest{}, 0, oldBackupManifestErr - } else { - // We found a `BACKUP` manifest file. - manifest = oldBackupManifest - memSize = oldBackupManifestMemSize } + + // We found a `BACKUP` manifest file. + manifest = oldBackupManifest + memSize = oldBackupManifestMemSize } else { // We found a `BACKUP_MANIFEST` file. manifest = backupManifest @@ -561,28 +562,35 @@ func WriteBackupLock( return cloud.WriteFile(ctx, defaultStore, lockFileName, bytes.NewReader([]byte("lock"))) } -// WriteFilesListMetadataWithSSTs writes a "slim" version of manifest -// to `exportStore`. This version has the alloc heavy `Files` repeated field -// nil'ed out, and written to an accompanying SST instead. -func WriteFilesListMetadataWithSSTs( +// WriteMetadataWithExternalSSTs writes a "slim" version of manifest to +// `exportStore`. This version has the alloc heavy `Files`, `Descriptors`, and +// `DescriptorChanges` repeated fields nil'ed out, and written to an +// accompanying SST instead. +func WriteMetadataWithExternalSSTs( ctx context.Context, exportStore cloud.ExternalStorage, encryption *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, manifest *backuppb.BackupManifest, ) error { - if err := writeFilesListMetadata(ctx, exportStore, backupbase.BackupMetadataName, encryption, - kmsEnv, manifest); err != nil { - return errors.Wrap(err, "failed to write the backup metadata with external Files list") + if err := WriteFilesListSST(ctx, exportStore, encryption, kmsEnv, manifest, + BackupMetadataFilesListPath); err != nil { + return errors.Wrap(err, "failed to write backup metadata Files SST") + } + + if err := WriteDescsSST(ctx, manifest, exportStore, encryption, kmsEnv, BackupMetadataDescriptorsListPath); err != nil { + return errors.Wrap(err, "failed to write backup metadata descriptors SST") } - return errors.Wrap(WriteFilesListSST(ctx, exportStore, encryption, kmsEnv, manifest, - BackupMetadataFilesListPath), "failed to write backup metadata Files SST") + + return errors.Wrap(writeExternalSSTsMetadata(ctx, exportStore, backupbase.BackupMetadataName, encryption, + kmsEnv, manifest), "failed to write the backup metadata with external Files list") } -// writeFilesListMetadata compresses and writes a slimmer version of the -// BackupManifest `desc` to `exportStore` with the `Files` field of the proto -// set to a bogus value that will error out on incorrect use. -func writeFilesListMetadata( +// writeExternalSSTsMetadata compresses and writes a slimmer version of the +// BackupManifest `desc` to `exportStore` with the `Files`, `Descriptors`, and +// `DescriptorChanges` fields of the proto set to bogus values that will error +// out on incorrect use. +func writeExternalSSTsMetadata( ctx context.Context, exportStore cloud.ExternalStorage, filename string, @@ -598,7 +606,31 @@ func writeFilesListMetadata( Path: "assertion: this placeholder legacy Files entry should never be opened", } slimManifest.Files = []backuppb.BackupManifest_File{bogusFile} - slimManifest.HasExternalFilesList = true + + // We write a bogus descriptor to Descriptors and DescriptorChanges with max + // timestamp as the modification time so RunPostDeserializationChanges() + // always fails on restore. + bogusTableID := descpb.ID(1) + bogusTableDesc := descpb.Descriptor{ + Union: &descpb.Descriptor_Table{ + Table: &descpb.TableDescriptor{ + ID: bogusTableID, + Name: "assertion: this placeholder legacy Descriptor entry should never be used", + Version: 1, + ModificationTime: hlc.MaxTimestamp, + }, + }, + } + slimManifest.Descriptors = []descpb.Descriptor{bogusTableDesc} + + bogusDescRev := backuppb.BackupManifest_DescriptorRevision{ + ID: bogusTableID, + Time: hlc.MaxTimestamp, + Desc: &bogusTableDesc, + } + slimManifest.DescriptorChanges = []backuppb.BackupManifest_DescriptorRevision{bogusDescRev} + + slimManifest.HasExternalManifestSSTs = true return WriteBackupManifest(ctx, exportStore, filename, encryption, kmsEnv, &slimManifest) } @@ -926,14 +958,23 @@ func GetBackupIndexAtTime( // LoadSQLDescsFromBackupsAtTime returns the Descriptors found in the last // (latest) backup with a StartTime >= asOf. +// +// TODO(rui): note that materializing all descriptors doesn't scale with cluster +// size. We temporarily materialize all descriptors here to limit the scope of +// changes required to use BackupManifest with iterating repeated fields in +// restore. func LoadSQLDescsFromBackupsAtTime( - backupManifests []backuppb.BackupManifest, asOf hlc.Timestamp, + ctx context.Context, + backupManifests []backuppb.BackupManifest, + layerToBackupManifestFileIterFactory LayerToBackupManifestFileIterFactory, + asOf hlc.Timestamp, ) ([]catalog.Descriptor, backuppb.BackupManifest, error) { lastBackupManifest := backupManifests[len(backupManifests)-1] + lastIterFactory := layerToBackupManifestFileIterFactory[len(backupManifests)-1] if asOf.IsEmpty() { if lastBackupManifest.DescriptorCoverage != tree.AllDescriptors { - descs, err := BackupManifestDescriptors(&lastBackupManifest) + descs, err := BackupManifestDescriptors(ctx, lastIterFactory, lastBackupManifest.EndTime) return descs, lastBackupManifest, err } @@ -950,21 +991,56 @@ func LoadSQLDescsFromBackupsAtTime( } lastBackupManifest = b } - if len(lastBackupManifest.DescriptorChanges) == 0 { - descs, err := BackupManifestDescriptors(&lastBackupManifest) - return descs, lastBackupManifest, err + + // From this point on we try to load descriptors based on descriptor + // revisions. The algorithm below assumes that descriptor revisions are sorted + // by DescChangesLess, which is a sort by descriptor ID, then descending by + // revision time for revisions with the same ID. The external SST for + // descriptors already have entries sorted in this order, we just have to make + // sure the in-memory descriptors in the manifest are ordered as well. + sort.Slice(lastBackupManifest.DescriptorChanges, func(i, j int) bool { + return DescChangesLess(&lastBackupManifest.DescriptorChanges[i], &lastBackupManifest.DescriptorChanges[j]) + }) + + descRevIt := lastIterFactory.NewDescriptorChangesIter(ctx) + defer descRevIt.Close() + if ok, err := descRevIt.Valid(); err != nil { + return nil, backuppb.BackupManifest{}, err + } else if !ok { + descs, err := BackupManifestDescriptors(ctx, lastIterFactory, lastBackupManifest.EndTime) + if err != nil { + return nil, backuppb.BackupManifest{}, err + } + return descs, lastBackupManifest, nil } - byID := make(map[descpb.ID]catalog.DescriptorBuilder, len(lastBackupManifest.Descriptors)) - for _, rev := range lastBackupManifest.DescriptorChanges { - if asOf.Less(rev.Time) { + byID := make(map[descpb.ID]catalog.DescriptorBuilder, 0) + prevRevID := descpb.InvalidID + for ; ; descRevIt.Next() { + if ok, err := descRevIt.Valid(); err != nil { + return nil, backuppb.BackupManifest{}, err + } else if !ok { break } - if rev.Desc == nil { - delete(byID, rev.ID) - } else { + + rev := descRevIt.Value() + if asOf.Less(rev.Time) { + continue + } + + // At this point descriptor revisions are sorted by DescChangesLess, which + // is a sort by descriptor ID, then descending by revision time for + // revisions with the same ID. If we've already seen a revision for this + // descriptor ID that's not greater than asOf, then we can skip the rest of + // the revisions for the ID. + if rev.ID == prevRevID { + continue + } + + if rev.Desc != nil { byID[rev.ID] = newDescriptorBuilder(rev.Desc, rev.Time) } + prevRevID = rev.ID } allDescs := make([]catalog.Descriptor, 0, len(byID)) @@ -1148,11 +1224,20 @@ func TempCheckpointFileNameForJob(jobID jobspb.JobID) string { // BackupManifestDescriptors returns the descriptors encoded in the manifest as // a slice of mutable descriptors. func BackupManifestDescriptors( - backupManifest *backuppb.BackupManifest, + ctx context.Context, iterFactory *IterFactory, endTime hlc.Timestamp, ) ([]catalog.Descriptor, error) { - ret := make([]catalog.Descriptor, 0, len(backupManifest.Descriptors)) - for i := range backupManifest.Descriptors { - b := newDescriptorBuilder(&backupManifest.Descriptors[i], backupManifest.EndTime) + descIt := iterFactory.NewDescIter(ctx) + defer descIt.Close() + + ret := make([]catalog.Descriptor, 0) + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + b := newDescriptorBuilder(descIt.Value(), endTime) if b == nil { continue } @@ -1459,3 +1544,125 @@ func MakeBackupCodec(manifest backuppb.BackupManifest) (keys.SQLCodec, error) { } return backupCodec, nil } + +// IterFactory has provides factory methods to construct iterators that iterate +// over the `BackupManifest_Files`, `Descriptors`, and +// `BackupManifest_DescriptorRevision` in a `BackupManifest`. It is the callers +// responsibility to close the returned iterators. +type IterFactory struct { + m *backuppb.BackupManifest + store cloud.ExternalStorage + fileSSTPath string + descriptorSSTPath string + encryption *jobspb.BackupEncryptionOptions + kmsEnv cloud.KMSEnv +} + +// NewIterFactory constructs a new IterFactory for a BackupManifest. +func NewIterFactory( + m *backuppb.BackupManifest, + store cloud.ExternalStorage, + encryption *jobspb.BackupEncryptionOptions, + kmsEnv cloud.KMSEnv, +) *IterFactory { + return &IterFactory{ + m: m, + store: store, + fileSSTPath: BackupMetadataFilesListPath, + descriptorSSTPath: BackupMetadataDescriptorsListPath, + encryption: encryption, + kmsEnv: kmsEnv, + } +} + +// LayerToBackupManifestFileIterFactory is the mapping from the idx of the +// backup layer to an IterFactory. +type LayerToBackupManifestFileIterFactory map[int]*IterFactory + +// NewFileIter creates a new Iterator over BackupManifest_Files. It is assumed +// that the BackupManifest_File are sorted by FileCmp. +func (f *IterFactory) NewFileIter( + ctx context.Context, +) (bulk.Iterator[*backuppb.BackupManifest_File], error) { + if f.m.HasExternalManifestSSTs { + storeFile := storageccl.StoreFile{ + Store: f.store, + FilePath: f.fileSSTPath, + } + var encOpts *roachpb.FileEncryptionOptions + if f.encryption != nil { + key, err := backupencryption.GetEncryptionKey(ctx, f.encryption, f.kmsEnv) + if err != nil { + return nil, err + } + encOpts = &roachpb.FileEncryptionOptions{Key: key} + } + return NewFileSSTIter(ctx, storeFile, encOpts) + } + + return newSlicePointerIterator(f.m.Files), nil +} + +// NewDescIter creates a new Iterator over Descriptors. +func (f *IterFactory) NewDescIter(ctx context.Context) bulk.Iterator[*descpb.Descriptor] { + if f.m.HasExternalManifestSSTs { + backing := makeBytesIter(ctx, f.store, f.descriptorSSTPath, []byte(sstDescsPrefix), f.encryption, true, f.kmsEnv) + it := DescIterator{ + backing: backing, + } + it.Next() + return &it + } + + return newSlicePointerIterator(f.m.Descriptors) +} + +// NewDescriptorChangesIter creates a new Iterator over +// BackupManifest_DescriptorRevisions. It is assumed that descriptor changes are +// sorted by DescChangesLess. +func (f *IterFactory) NewDescriptorChangesIter( + ctx context.Context, +) bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] { + if f.m.HasExternalManifestSSTs { + if f.m.MVCCFilter == backuppb.MVCCFilter_Latest { + // If the manifest is backuppb.MVCCFilter_Latest, then return an empty + // iterator for descriptor changes. + var backing []backuppb.BackupManifest_DescriptorRevision + return newSlicePointerIterator(backing) + } + + backing := makeBytesIter(ctx, f.store, f.descriptorSSTPath, []byte(sstDescsPrefix), f.encryption, + false, f.kmsEnv) + dri := DescriptorRevisionIterator{ + backing: backing, + } + + dri.Next() + return &dri + } + + return newSlicePointerIterator(f.m.DescriptorChanges) +} + +// GetBackupManifestIterFactories constructs a mapping from the idx of the +// backup layer to an IterFactory. +func GetBackupManifestIterFactories( + ctx context.Context, + storeFactory cloud.ExternalStorageFactory, + backupManifests []backuppb.BackupManifest, + encryption *jobspb.BackupEncryptionOptions, + kmsEnv cloud.KMSEnv, +) (map[int]*IterFactory, error) { + layerToFileIterFactory := make(map[int]*IterFactory) + for layer := range backupManifests { + es, err := storeFactory(ctx, backupManifests[layer].Dir) + if err != nil { + return nil, err + } + + f := NewIterFactory(&backupManifests[layer], es, encryption, kmsEnv) + layerToFileIterFactory[layer] = f + } + + return layerToFileIterFactory, nil +} diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go b/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go new file mode 100644 index 000000000000..b3246203ec5f --- /dev/null +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go @@ -0,0 +1,300 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupinfo_test + +import ( + "context" + "fmt" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/cloud" + _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/bulk" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestManifestHandlingIteratorOperations tests operations for iterators over +// the external SSTs of a backup manifest. +func TestManifestHandlingIteratorOperations(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numFiles = 10 + const numDescriptors = 10 + const changesPerDescriptor = 3 + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///foo", + base.ExternalIODirConfig{}, + tc.Server(0).ClusterSettings(), + blobs.TestEmptyBlobClientFactory, + username.RootUserName(), + tc.Server(0).InternalExecutor().(*sql.InternalExecutor), + tc.Server(0).InternalExecutorFactory().(*sql.InternalExecutorFactory), + tc.Server(0).DB(), + nil, /* limiters */ + cloud.NilMetrics, + ) + require.NoError(t, err) + + m := makeMockManifest(numFiles, numDescriptors, changesPerDescriptor) + require.NoError(t, backupinfo.WriteMetadataWithExternalSSTs(ctx, store, nil, nil, &m)) + + iterFactory := backupinfo.NewIterFactory(&m, store, nil, nil) + + fileLess := func(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) bool { + return backupinfo.FileCmp(left, right) < 0 + } + var sortedFiles []backuppb.BackupManifest_File + for i := range m.Files { + sortedFiles = append(sortedFiles, m.Files[i]) + } + sort.Slice(sortedFiles, func(i, j int) bool { + return fileLess(m.Files[i], m.Files[j]) + }) + + descLess := func(left descpb.Descriptor, right descpb.Descriptor) bool { + tLeft, _, _, _, _ := descpb.GetDescriptors(&left) + tRight, _, _, _, _ := descpb.GetDescriptors(&right) + return tLeft.ID < tRight.ID + } + var sortedDescs []descpb.Descriptor + for i := range m.Descriptors { + sortedDescs = append(sortedDescs, m.Descriptors[i]) + } + sort.Slice(sortedDescs, func(i, j int) bool { + return descLess(sortedDescs[i], sortedDescs[j]) + }) + + descRevsLess := func( + left backuppb.BackupManifest_DescriptorRevision, + right backuppb.BackupManifest_DescriptorRevision, + ) bool { + return backupinfo.DescChangesLess(&left, &right) + } + var sortedDescRevs []backuppb.BackupManifest_DescriptorRevision + for i := range m.DescriptorChanges { + sortedDescRevs = append(sortedDescRevs, m.DescriptorChanges[i]) + } + sort.Slice(sortedDescRevs, func(i, j int) bool { + return descRevsLess(sortedDescRevs[i], sortedDescRevs[j]) + }) + + t.Run("files", func(t *testing.T) { + checkIteratorOperations(t, mustCreateFileIterFactory(t, iterFactory), sortedFiles, fileLess) + }) + t.Run("descriptors", func(t *testing.T) { + checkIteratorOperations(t, iterFactory.NewDescIter, sortedDescs, descLess) + }) + t.Run("descriptor-changes", func(t *testing.T) { + checkIteratorOperations(t, iterFactory.NewDescriptorChangesIter, sortedDescRevs, descRevsLess) + }) +} + +// TestManifestHandlingIteratorOperations tests operations for an empty external +// manifest SST iterator. +func TestManifestHandlingEmptyIterators(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///foo", + base.ExternalIODirConfig{}, + tc.Server(0).ClusterSettings(), + blobs.TestEmptyBlobClientFactory, + username.RootUserName(), + tc.Server(0).InternalExecutor().(*sql.InternalExecutor), + tc.Server(0).InternalExecutorFactory().(*sql.InternalExecutorFactory), + tc.Server(0).DB(), + nil, /* limiters */ + cloud.NilMetrics, + ) + require.NoError(t, err) + + m := makeMockManifest(0, 0, 0) + require.NoError(t, backupinfo.WriteMetadataWithExternalSSTs(ctx, store, nil, nil, &m)) + + iterFactory := backupinfo.NewIterFactory(&m, store, nil, nil) + t.Run("files", func(t *testing.T) { + checkEmptyIteratorOperations(t, mustCreateFileIterFactory(t, iterFactory)) + }) + t.Run("descriptors", func(t *testing.T) { + checkEmptyIteratorOperations(t, iterFactory.NewDescIter) + }) + t.Run("descriptor-changes", func(t *testing.T) { + checkEmptyIteratorOperations(t, iterFactory.NewDescriptorChangesIter) + }) +} + +func makeMockManifest( + numFiles int, numDescriptors int, changesPerDescriptor int, +) backuppb.BackupManifest { + m := backuppb.BackupManifest{} + m.HasExternalManifestSSTs = true + m.MVCCFilter = backuppb.MVCCFilter_All + for i := 0; i < numFiles; i++ { + spKey := fmt.Sprintf("/Table/%04d", i) + spEndKey := fmt.Sprintf("/Table/%04d", i+1) + f := backuppb.BackupManifest_File{ + Span: roachpb.Span{ + Key: []byte(spKey), + EndKey: []byte(spEndKey), + }, + Path: fmt.Sprintf("file%04d.sst", i), + } + m.Files = append(m.Files, f) + } + + for i := 1; i <= numDescriptors; i++ { + // Have some deleted descriptors as well. + isDeleted := i%5 == 4 + + tbl := descpb.TableDescriptor{ID: descpb.ID(i), + Name: fmt.Sprintf("table%d", i), + Version: descpb.DescriptorVersion(changesPerDescriptor), + } + desc := descpb.Descriptor{Union: &descpb.Descriptor_Table{Table: &tbl}} + if !isDeleted { + m.Descriptors = append(m.Descriptors, desc) + } + + for j := 1; j <= changesPerDescriptor; j++ { + tbl.Version = descpb.DescriptorVersion(j) + rev := backuppb.BackupManifest_DescriptorRevision{ + Time: hlc.Timestamp{WallTime: int64(j)}, + ID: tbl.ID, + Desc: &desc, + } + + if isDeleted && j == changesPerDescriptor { + rev.Desc = nil + } + m.DescriptorChanges = append(m.DescriptorChanges, rev) + } + } + + return m +} + +func checkIteratorOperations[T any]( + t *testing.T, + mkIter func(context.Context) bulk.Iterator[*T], + expected []T, + less func(left T, right T) bool, +) { + ctx := context.Background() + + // 1. Check if the iterator returns the expected contents, regardless of how + // many times value is called between calls to Next(). + for numValueCalls := 1; numValueCalls <= 5; numValueCalls++ { + var actual []T + it := mkIter(ctx) + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + var value T + for i := 0; i < numValueCalls; i++ { + value = *it.Value() + } + + actual = append(actual, value) + } + + sort.Slice(actual, func(i, j int) bool { + return less(actual[i], actual[j]) + }) + + require.Equal(t, expected, actual, fmt.Sprintf("contents not equal if there are %d calls to Value()", numValueCalls)) + } + + // 2. Check that we can repeatedly call Next() and Value() after the iterator + // is done. + it := mkIter(ctx) + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + } + + for i := 0; i < 10; i++ { + it.Next() + ok, err := it.Valid() + require.False(t, ok) + require.NoError(t, err) + + it.Value() // Should not error or panic. + } + + // 3. Check that we can get the value without calling Valid(). + itNoCheck := mkIter(ctx) + defer itNoCheck.Close() + require.Greater(t, len(expected), 0) + value := itNoCheck.Value() + require.Contains(t, expected, *value) + + ok, err := itNoCheck.Valid() + require.True(t, ok) + require.NoError(t, err) +} + +func checkEmptyIteratorOperations[T any]( + t *testing.T, mkIter func(context.Context) bulk.Iterator[*T], +) { + ctx := context.Background() + + // Check that regardless of how many calls to Next() the iterator will not be + // valid. + for numNextCalls := 0; numNextCalls < 5; numNextCalls++ { + it := mkIter(ctx) + defer it.Close() + for i := 0; i < numNextCalls; i++ { + it.Next() + } + + ok, err := it.Valid() + require.NoError(t, err) + require.False(t, ok) + + it.Value() // Should not error or panic. + } +} + +func mustCreateFileIterFactory( + t *testing.T, iterFactory *backupinfo.IterFactory, +) func(ctx context.Context) bulk.Iterator[*backuppb.BackupManifest_File] { + return func(ctx context.Context) bulk.Iterator[*backuppb.BackupManifest_File] { + it, err := iterFactory.NewFileIter(ctx) + require.NoError(t, err) + return it + } +} diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index 0f2185294af8..e3fbb96bbf4c 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -131,12 +131,13 @@ message BackupManifest { int32 descriptor_coverage = 22 [ (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree.DescriptorCoverage"]; - // HasExternalFilesList is set to true if the backup manifest has its `Files` - // field nil'ed out and written as a supporting SST file instead. + // HasExternalManifestSSTs is set to true if the backup manifest has its + // `Files`, `Descriptors`, and DescriptorChanges fields nil'ed out and written + // as a supporting SST file instead. // // TODO(adityamaru): Delete when backwards compatibility with 22.2 is dropped // since all backups in 23.1+ will write slim manifests. - bool has_external_files_list = 27; + bool has_external_manifest_ssts = 27 [(gogoproto.customname) = "HasExternalManifestSSTs"]; // NEXT ID: 28 } diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index 69dd87441e5c..517fb8df5da4 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -13,6 +13,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -83,7 +84,7 @@ func BenchmarkRestoreEntryCover(b *testing.B) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(b, err) - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, execCfg.DistSQLSrv.ExternalStorage, + layerToBackupManifestFileIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(b, err) diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index ec5748053353..2107e965034d 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -613,7 +614,7 @@ func TestClusterRestoreFailCleanup(t *testing.T) { if err != nil { t.Fatal(err) } - if info.Name() == backupbase.BackupManifestName || !strings.HasSuffix(path, ".sst") { + if info.Name() == backupbase.BackupManifestName || !strings.HasSuffix(path, ".sst") || info.Name() == backupinfo.BackupMetadataDescriptorsListPath || info.Name() == backupinfo.BackupMetadataFilesListPath { return nil } return os.Remove(path) diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index 7a3c2e77e83f..3a51c8f3fd9c 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -10,11 +10,13 @@ package backupccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -227,7 +229,7 @@ func (gssp *generativeSplitAndScatterProcessor) close() { func makeBackupMetadata( ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.GenerativeSplitAndScatterSpec, -) ([]backuppb.BackupManifest, layerToBackupManifestFileIterFactory, error) { +) ([]backuppb.BackupManifest, backupinfo.LayerToBackupManifestFileIterFactory, error) { kmsEnv := backupencryption.MakeBackupKMSEnv(flowCtx.Cfg.Settings, &flowCtx.Cfg.ExternalIODirConfig, flowCtx.Cfg.DB, spec.User(), flowCtx.Cfg.Executor) @@ -238,7 +240,7 @@ func makeBackupMetadata( return nil, nil, err } - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, flowCtx.Cfg.ExternalStorage, + layerToBackupManifestFileIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, flowCtx.Cfg.ExternalStorage, backupManifests, spec.Encryption, &kmsEnv) if err != nil { return nil, nil, err diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go index a19d53bce1d6..0b63fe14c599 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -54,13 +53,12 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { // has been processed by the generative split and scatterer. s0 := tc.Server(0) registry := tc.Server(0).JobRegistry().(*jobs.Registry) - execCfg := s0.ExecutorConfig().(sql.ExecutorConfig) + execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{ - Settings: st, - DB: s0.InternalDB().(descs.DB), - JobRegistry: registry, - ExecutorConfig: &execCfg, + Settings: st, + DB: s0.DB(), + JobRegistry: registry, TestingKnobs: execinfra.TestingKnobs{ BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ RunAfterSplitAndScatteringEntry: func(ctx context.Context) { @@ -68,9 +66,10 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { }, }, }, + ExternalStorageFromURI: execCfg.DistSQLSrv.ExternalStorageFromURI, + ExternalStorage: execCfg.DistSQLSrv.ExternalStorage, }, EvalCtx: &evalCtx, - Mon: evalCtx.TestingMon, DiskMonitor: testDiskMonitor, NodeID: evalCtx.NodeID, } @@ -108,8 +107,8 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { kr, err := MakeKeyRewriterFromRekeys(keys.SystemSQLCodec, tableRekeys, nil, false) require.NoError(t, err) - chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} - chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} + chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB, kr)} + chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB, kr)} // Large enough so doneScatterCh never blocks. doneScatterCh := make(chan entryNode, 1000) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 232f0a45f063..aa0d72acf4cf 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -281,10 +281,9 @@ func restore( // which are grouped by keyrange. highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(restoreCtx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, - backupManifests, encryption, kmsEnv) + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(restoreCtx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, backupManifests, encryption, kmsEnv) if err != nil { - return emptyRowCount, err + return roachpb.RowCount{}, err } simpleImportSpans := useSimpleImportSpans.Get(&execCtx.ExecCfg().Settings.SV) @@ -318,7 +317,7 @@ func restore( ctx, dataToRestore.getSpans(), backupManifests, - layerToBackupManifestFileIterFactory, + layerToIterFactory, backupLocalityMap, introducedSpanFrontier, highWaterMark, @@ -493,7 +492,13 @@ func loadBackupSQLDescs( return nil, backuppb.BackupManifest{}, nil, 0, err } - allDescs, latestBackupManifest, err := backupinfo.LoadSQLDescsFromBackupsAtTime(backupManifests, details.EndTime) + layerToBackupManifestFileIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, + backupManifests, encryption, kmsEnv) + if err != nil { + return nil, backuppb.BackupManifest{}, nil, 0, err + } + + allDescs, latestBackupManifest, err := backupinfo.LoadSQLDescsFromBackupsAtTime(ctx, backupManifests, layerToBackupManifestFileIterFactory, details.EndTime) if err != nil { return nil, backuppb.BackupManifest{}, nil, 0, err } diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index d66d96eaa828..87030a067407 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -799,15 +799,30 @@ func maybeUpgradeDescriptors(descs []catalog.Descriptor, skipFKsWithNoMatchingTa // "other" table is missing from the set provided are omitted during the // upgrade, instead of causing an error to be returned. func maybeUpgradeDescriptorsInBackupManifests( - backupManifests []backuppb.BackupManifest, skipFKsWithNoMatchingTable bool, + ctx context.Context, + backupManifests []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, + skipFKsWithNoMatchingTable bool, ) error { if len(backupManifests) == 0 { return nil } + // TODO(rui): We do not need to upgrade descriptors that exist in the external + // SST to the 19.2 style because they would've been generated by at least + // version 22.2. Delete this function once backups must use manifests with + // external SSTs. + newDescriptorStyleVersion := roachpb.Version{ + Major: 19, + Minor: 2, + } + if !backupManifests[0].ClusterVersion.Less(newDescriptorStyleVersion) { + return nil + } + descriptors := make([]catalog.Descriptor, 0, len(backupManifests[0].Descriptors)) for i := range backupManifests { - descs, err := backupinfo.BackupManifestDescriptors(&backupManifests[i]) + descs, err := backupinfo.BackupManifestDescriptors(ctx, layerToIterFactory[i], backupManifests[i].EndTime) if err != nil { return err } @@ -1570,10 +1585,24 @@ func doRestorePlan( // be caught by backups. wasOffline := make(map[tableAndIndex]hlc.Timestamp) - for _, m := range mainBackupManifests { + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, mainBackupManifests, encryption, &kmsEnv) + if err != nil { + return err + } + + for i, m := range mainBackupManifests { spans := roachpb.Spans(m.Spans) - for i := range m.Descriptors { - table, _, _, _, _ := descpb.GetDescriptors(&m.Descriptors[i]) + descIt := layerToIterFactory[i].NewDescIter(ctx) + defer descIt.Close() + + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return err + } else if !ok { + break + } + + table, _, _, _, _ := descpb.GetDescriptors(descIt.Value()) if table == nil { continue } @@ -1598,7 +1627,7 @@ func doRestorePlan( } sqlDescs, restoreDBs, descsByTablePattern, tenants, err := selectTargets( - ctx, p, mainBackupManifests, restoreStmt.Targets, restoreStmt.DescriptorCoverage, endTime, + ctx, p, mainBackupManifests, layerToIterFactory, restoreStmt.Targets, restoreStmt.DescriptorCoverage, endTime, ) if err != nil { return errors.Wrap(err, @@ -1606,7 +1635,7 @@ func doRestorePlan( "use SHOW BACKUP to find correct targets") } - if err := checkMissingIntroducedSpans(sqlDescs, mainBackupManifests, endTime, backupCodec); err != nil { + if err := checkMissingIntroducedSpans(ctx, sqlDescs, mainBackupManifests, layerToIterFactory, endTime, backupCodec); err != nil { return err } diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index cdd4057ba9b5..181b474cf160 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -13,19 +13,15 @@ import ( "context" "sort" - "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" - "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" - "github.com/cockroachdb/cockroach/pkg/cloud" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" - "github.com/cockroachdb/errors" ) type intervalSpan roachpb.Span @@ -67,79 +63,6 @@ type backupManifestFileIterator interface { peek() (backuppb.BackupManifest_File, bool) err() error close() - reset() -} - -// inMemoryFileIterator iterates over the `BackupManifest_Files` field stored -// in-memory in the manifest. -type inMemoryFileIterator struct { - manifest *backuppb.BackupManifest - curIdx int -} - -func (i *inMemoryFileIterator) next() (backuppb.BackupManifest_File, bool) { - f, hasNext := i.peek() - i.curIdx++ - return f, hasNext -} - -func (i *inMemoryFileIterator) peek() (backuppb.BackupManifest_File, bool) { - if i.curIdx >= len(i.manifest.Files) { - return backuppb.BackupManifest_File{}, false - } - f := i.manifest.Files[i.curIdx] - return f, true -} - -func (i *inMemoryFileIterator) err() error { - return nil -} - -func (i *inMemoryFileIterator) close() {} - -func (i *inMemoryFileIterator) reset() { - i.curIdx = 0 -} - -var _ backupManifestFileIterator = &inMemoryFileIterator{} - -// makeBackupManifestFileIterator returns a backupManifestFileIterator that can -// be used to iterate over the `BackupManifest_Files` of the manifest. -func makeBackupManifestFileIterator( - ctx context.Context, - storeFactory cloud.ExternalStorageFactory, - m backuppb.BackupManifest, - encryption *jobspb.BackupEncryptionOptions, - kmsEnv cloud.KMSEnv, -) (backupManifestFileIterator, error) { - if m.HasExternalFilesList { - es, err := storeFactory(ctx, m.Dir) - if err != nil { - return nil, err - } - storeFile := storageccl.StoreFile{ - Store: es, - FilePath: backupinfo.BackupMetadataFilesListPath, - } - var encOpts *roachpb.FileEncryptionOptions - if encryption != nil { - key, err := backupencryption.GetEncryptionKey(ctx, encryption, kmsEnv) - if err != nil { - return nil, err - } - encOpts = &roachpb.FileEncryptionOptions{Key: key} - } - it, err := backupinfo.NewFileSSTIter(ctx, storeFile, encOpts) - if err != nil { - return nil, errors.Wrap(err, "failed to create new FileSST iterator") - } - return &sstFileIterator{fi: it}, nil - } - - return &inMemoryFileIterator{ - manifest: &m, - curIdx: 0, - }, nil } // sstFileIterator uses an underlying `backupinfo.FileIterator` to read the @@ -173,10 +96,6 @@ func (s *sstFileIterator) close() { s.fi.Close() } -func (s *sstFileIterator) reset() { - s.fi.Reset() -} - var _ backupManifestFileIterator = &sstFileIterator{} // makeSimpleImportSpans partitions the spans of requiredSpans into a covering @@ -209,9 +128,10 @@ var _ backupManifestFileIterator = &sstFileIterator{} // if its current data size plus that of the new span is less than the target // size. func makeSimpleImportSpans( + ctx context.Context, requiredSpans roachpb.Spans, backups []backuppb.BackupManifest, - layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, backupLocalityMap map[int]storeByLocalityKV, introducedSpanFrontier *spanUtils.Frontier, lowWaterMark roachpb.Key, @@ -266,18 +186,24 @@ func makeSimpleImportSpans( // we reach out to ExternalStorage to read the accompanying SST that // contains the BackupManifest_Files. iterFactory := layerToBackupManifestFileIterFactory[layer] - it, err := iterFactory() + it, err := iterFactory.NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() + defer it.Close() covPos := spanCoverStart // lastCovSpanSize is the size of files added to the right-most span of // the cover so far. var lastCovSpanSize int64 - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + f := it.Value() if sp := span.Intersect(f.Span); sp.Valid() { fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir} if dir, ok := backupLocalityMap[layer][f.LocalityKV]; ok { @@ -342,9 +268,6 @@ func makeSimpleImportSpans( break } } - if err := it.err(); err != nil { - return nil, err - } } } @@ -383,32 +306,6 @@ func makeEntry(start, end roachpb.Key, f execinfrapb.RestoreFileSpec) execinfrap } } -type layerToBackupManifestFileIterFactory map[int]func() (backupManifestFileIterator, error) - -// getBackupManifestFileIters constructs a mapping from the idx of the backup -// layer to a factory method to construct a backupManifestFileIterator. This -// iterator can be used to iterate over the `BackupManifest_Files` in a -// `BackupManifest`. It is the callers responsibility to close the returned -// iterators. -func getBackupManifestFileIters( - ctx context.Context, - storeFactory cloud.ExternalStorageFactory, - backupManifests []backuppb.BackupManifest, - encryption *jobspb.BackupEncryptionOptions, - kmsEnv cloud.KMSEnv, -) (map[int]func() (backupManifestFileIterator, error), error) { - layerToFileIterFactory := make(map[int]func() (backupManifestFileIterator, error)) - for layer := range backupManifests { - layer := layer - layerToFileIterFactory[layer] = func() (backupManifestFileIterator, error) { - manifest := backupManifests[layer] - return makeBackupManifestFileIterator(ctx, storeFactory, manifest, encryption, kmsEnv) - } - } - - return layerToFileIterFactory, nil -} - // generateAndSendImportSpans partitions the spans of requiredSpans into a // covering of RestoreSpanEntry's which each have all overlapping files from the // passed backups assigned to them. The spans of requiredSpans are @@ -454,7 +351,7 @@ func generateAndSendImportSpans( ctx context.Context, requiredSpans roachpb.Spans, backups []backuppb.BackupManifest, - layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, backupLocalityMap map[int]storeByLocalityKV, introducedSpanFrontier *spanUtils.Frontier, lowWaterMark roachpb.Key, @@ -463,7 +360,7 @@ func generateAndSendImportSpans( useSimpleImportSpans bool, ) error { if useSimpleImportSpans { - importSpans, err := makeSimpleImportSpans(requiredSpans, backups, layerToBackupManifestFileIterFactory, backupLocalityMap, introducedSpanFrontier, lowWaterMark, targetSize) + importSpans, err := makeSimpleImportSpans(ctx, requiredSpans, backups, layerToBackupManifestFileIterFactory, backupLocalityMap, introducedSpanFrontier, lowWaterMark, targetSize) if err != nil { return err } @@ -474,14 +371,14 @@ func generateAndSendImportSpans( return nil } - startEndKeyIt, err := newFileSpanStartAndEndKeyIterator(backups, layerToBackupManifestFileIterFactory) + startEndKeyIt, err := newFileSpanStartAndEndKeyIterator(ctx, backups, layerToBackupManifestFileIterFactory) if err != nil { return err } - fileIterByLayer := make([]backupManifestFileIterator, 0, len(backups)) + fileIterByLayer := make([]bulk.Iterator[*backuppb.BackupManifest_File], 0, len(backups)) for layer := range backups { - iter, err := layerToBackupManifestFileIterFactory[layer]() + iter, err := layerToBackupManifestFileIterFactory[layer].NewFileIter(ctx) if err != nil { return err } @@ -493,7 +390,7 @@ func generateAndSendImportSpans( // the cover so far. var lastCovSpanSize int64 var lastCovSpan roachpb.Span - var covFilesByLayer [][]backuppb.BackupManifest_File + var covFilesByLayer [][]*backuppb.BackupManifest_File var firstInSpan bool flush := func(ctx context.Context) error { @@ -591,7 +488,7 @@ func generateAndSendImportSpans( return err } - var filesByLayer [][]backuppb.BackupManifest_File + var filesByLayer [][]*backuppb.BackupManifest_File var covSize int64 var newCovFilesSize int64 @@ -694,16 +591,18 @@ func generateAndSendImportSpans( // [a, b, c, e, f, g] type fileSpanStartAndEndKeyIterator struct { heap *fileHeap - allIters []backupManifestFileIterator + allIters []bulk.Iterator[*backuppb.BackupManifest_File] err error } func newFileSpanStartAndEndKeyIterator( - backups []backuppb.BackupManifest, layerToIterFactory layerToBackupManifestFileIterFactory, + ctx context.Context, + backups []backuppb.BackupManifest, + layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, ) (*fileSpanStartAndEndKeyIterator, error) { it := &fileSpanStartAndEndKeyIterator{} for layer := range backups { - iter, err := layerToIterFactory[layer]() + iter, err := layerToBackupManifestFileIterFactory[layer].NewFileIter(ctx) if err != nil { return nil, err } @@ -730,14 +629,13 @@ func (i *fileSpanStartAndEndKeyIterator) next() { } if minItem.cmpEndKey { - file, ok := minItem.fileIter.next() - if err := minItem.fileIter.err(); err != nil { + minItem.fileIter.Next() + if ok, err := minItem.fileIter.Valid(); err != nil { i.err = err return - } - if ok { + } else if ok { minItem.cmpEndKey = false - minItem.file = file + minItem.file = minItem.fileIter.Value() heap.Push(i.heap, minItem) } } else { @@ -766,27 +664,25 @@ func (i *fileSpanStartAndEndKeyIterator) reset() { i.err = nil for _, iter := range i.allIters { - iter.reset() - - file, ok := iter.next() - if err := iter.err(); err != nil { + if ok, err := iter.Valid(); err != nil { i.err = err return + } else if !ok { + continue } - if ok { - i.heap.fileHeapItems = append(i.heap.fileHeapItems, fileHeapItem{ - fileIter: iter, - file: file, - cmpEndKey: false, - }) - } + + i.heap.fileHeapItems = append(i.heap.fileHeapItems, fileHeapItem{ + fileIter: iter, + file: iter.Value(), + cmpEndKey: false, + }) } heap.Init(i.heap) } type fileHeapItem struct { - fileIter backupManifestFileIterator - file backuppb.BackupManifest_File + fileIter bulk.Iterator[*backuppb.BackupManifest_File] + file *backuppb.BackupManifest_File cmpEndKey bool } @@ -831,19 +727,23 @@ func (f *fileHeap) Pop() any { } func getNewIntersectingFilesByLayer( - span roachpb.Span, layersCoveredLater map[int]bool, fileIters []backupManifestFileIterator, -) ([][]backuppb.BackupManifest_File, error) { - var files [][]backuppb.BackupManifest_File + span roachpb.Span, + layersCoveredLater map[int]bool, + fileIters []bulk.Iterator[*backuppb.BackupManifest_File], +) ([][]*backuppb.BackupManifest_File, error) { + var files [][]*backuppb.BackupManifest_File for l, iter := range fileIters { - var layerFiles []backuppb.BackupManifest_File + var layerFiles []*backuppb.BackupManifest_File if !layersCoveredLater[l] { - for ; ; iter.next() { - f, ok := iter.peek() - if !ok { + for ; ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return nil, err + } else if !ok { break } + f := iter.Value() if span.Overlaps(f.Span) { layerFiles = append(layerFiles, f) } @@ -852,9 +752,6 @@ func getNewIntersectingFilesByLayer( break } } - if iter.err() != nil { - return nil, iter.err() - } } files = append(files, layerFiles) } diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index a6e88764e562..fc4b550e38e0 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -62,7 +64,7 @@ func MockBackupChain( } for i := range backups { - backups[i].HasExternalFilesList = hasExternalFilesList + backups[i].HasExternalManifestSSTs = hasExternalFilesList backups[i].Spans = make(roachpb.Spans, spans) backups[i].IntroducedSpans = make(roachpb.Spans, 0) for j := range backups[i].Spans { @@ -108,15 +110,14 @@ func MockBackupChain( backups[i].Files[f].EntryCounts.DataSize = 1 << 20 } - config := cloudpb.ExternalStorage{S3Config: &cloudpb.ExternalStorage_S3{}} - if backups[i].HasExternalFilesList { + es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, + fmt.Sprintf("nodelocal://1/mock%s", timeutil.Now().String()), username.RootUserName()) + if err != nil { + return nil, err + } + config := es.Conf() + if backups[i].HasExternalManifestSSTs { // Write the Files to an SST and put them at a well known location. - es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, - fmt.Sprintf("nodelocal://1/mock%s", timeutil.Now().String()), username.RootUserName()) - if err != nil { - return nil, err - } - config = es.Conf() manifestCopy := backups[i] err = backupinfo.WriteFilesListSST(ctx, es, nil, nil, &manifestCopy, backupinfo.BackupMetadataFilesListPath) @@ -124,6 +125,13 @@ func MockBackupChain( return nil, err } backups[i].Files = nil + + err = backupinfo.WriteDescsSST(ctx, &manifestCopy, es, nil, nil, backupinfo.BackupMetadataDescriptorsListPath) + if err != nil { + return nil, err + } + backups[i].Descriptors = nil + backups[i].DescriptorChanges = nil } // A non-nil Dir more accurately models the footprint of produced coverings. backups[i].Dir = config @@ -159,9 +167,14 @@ func checkRestoreCovering( return err } + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, storageFactory, backups, nil, nil) + if err != nil { + return err + } + for _, span := range spans { var last roachpb.Key - for _, b := range backups { + for i, b := range backups { var coveredLater bool introducedSpanFrontier.Entries(func(s roachpb.Span, ts hlc.Timestamp) (done spanUtils.OpResult) { @@ -178,13 +191,18 @@ func checkRestoreCovering( // for explanation. continue } - it, err := makeBackupManifestFileIterator(ctx, storageFactory, b, - nil, nil) + it, err := layerToIterFactory[i].NewFileIter(ctx) if err != nil { return err } - defer it.close() - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return err + } else if !ok { + break + } + f := it.Value() if sp := span.Intersect(f.Span); sp.Valid() { if required[f.Path] == nil { required[f.Path] = &roachpb.SpanGroup{} @@ -195,9 +213,6 @@ func checkRestoreCovering( } } } - if it.err() != nil { - return it.err() - } } } var spanIdx int @@ -230,7 +245,7 @@ func makeImportSpans( ctx context.Context, spans []roachpb.Span, backups []backuppb.BackupManifest, - layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, targetSize int64, introducedSpanFrontier *spanUtils.Frontier, useSimpleImportSpans bool, @@ -245,7 +260,7 @@ func makeImportSpans( return nil }) - err := generateAndSendImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, targetSize, spanCh, useSimpleImportSpans) + err := generateAndSendImportSpans(ctx, spans, backups, layerToIterFactory, nil, introducedSpanFrontier, nil, targetSize, spanCh, useSimpleImportSpans) close(spanCh) if err != nil { @@ -257,6 +272,54 @@ func makeImportSpans( return cover, nil } +type coverutils struct { + dir cloudpb.ExternalStorage +} + +func makeCoverUtils(ctx context.Context, t *testing.T, execCfg *sql.ExecutorConfig) coverutils { + es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, + fmt.Sprintf("nodelocal://1/mock%s", timeutil.Now().String()), username.RootUserName()) + require.NoError(t, err) + dir := es.Conf() + return coverutils{ + dir: dir, + } +} + +func (c coverutils) sp(start, end string) roachpb.Span { + return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)} +} + +func (c coverutils) makeManifests(manifests []roachpb.Spans) []backuppb.BackupManifest { + ms := make([]backuppb.BackupManifest, len(manifests)) + fileCount := 1 + for i, manifest := range manifests { + ms[i].StartTime = hlc.Timestamp{WallTime: int64(i)} + ms[i].EndTime = hlc.Timestamp{WallTime: int64(i + 1)} + ms[i].Files = make([]backuppb.BackupManifest_File, len(manifest)) + ms[i].Dir = c.dir + for j, sp := range manifest { + ms[i].Files[j] = backuppb.BackupManifest_File{ + Span: sp, + Path: fmt.Sprintf("%d", fileCount), + + // Pretend every span has 1MB. + EntryCounts: roachpb.RowCount{DataSize: 1 << 20}, + } + fileCount++ + } + } + return ms +} + +func (c coverutils) paths(names ...string) []execinfrapb.RestoreFileSpec { + r := make([]execinfrapb.RestoreFileSpec, len(names)) + for i := range names { + r[i].Path = names[i] + r[i].Dir = c.dir + } + return r +} func TestRestoreEntryCoverExample(t *testing.T) { defer leaktest.AfterTest(t)() @@ -267,28 +330,16 @@ func TestRestoreEntryCoverExample(t *testing.T) { InitManualReplication) defer cleanupFn() - sp := func(start, end string) roachpb.Span { - return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)} - } - f := func(start, end, path string) backuppb.BackupManifest_File { - return backuppb.BackupManifest_File{Span: sp(start, end), Path: path} - } - paths := func(names ...string) []execinfrapb.RestoreFileSpec { - r := make([]execinfrapb.RestoreFileSpec, len(names)) - for i := range names { - r[i].Path = names[i] - } - return r - } + execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) + c := makeCoverUtils(ctx, t, &execCfg) // Setup and test the example in the comment of makeSimpleImportSpans. - spans := []roachpb.Span{sp("a", "f"), sp("f", "i"), sp("l", "m")} - backups := []backuppb.BackupManifest{ - {Files: []backuppb.BackupManifest_File{f("a", "c", "1"), f("c", "e", "2"), f("h", "i", "3")}}, - {Files: []backuppb.BackupManifest_File{f("b", "d", "4"), f("g", "i", "5")}}, - {Files: []backuppb.BackupManifest_File{f("a", "h", "6"), f("j", "k", "7")}}, - {Files: []backuppb.BackupManifest_File{f("h", "i", "8"), f("l", "m", "9")}}, - } + spans := []roachpb.Span{c.sp("a", "f"), c.sp("f", "i"), c.sp("l", "m")} + backups := c.makeManifests([]roachpb.Spans{ + {c.sp("a", "c"), c.sp("c", "e"), c.sp("h", "i")}, + {c.sp("b", "d"), c.sp("g", "i")}, + {c.sp("a", "h"), c.sp("j", "k")}, + {c.sp("h", "i"), c.sp("l", "m")}}) for i := range backups { backups[i].StartTime = hlc.Timestamp{WallTime: int64(i)} @@ -303,48 +354,157 @@ func TestRestoreEntryCoverExample(t *testing.T) { emptySpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{}) require.NoError(t, err) - execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, execCfg.DistSQLSrv.ExternalStorage, + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(t, err) - cover, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, noSpanTargetSize, emptySpanFrontier, false) + cover, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, noSpanTargetSize, emptySpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "b"), Files: paths("1", "6")}, - {Span: sp("b", "c"), Files: paths("1", "4", "6")}, - {Span: sp("c", "f"), Files: paths("2", "4", "6")}, - {Span: sp("f", "g"), Files: paths("6")}, - {Span: sp("g", "h"), Files: paths("5", "6")}, - {Span: sp("h", "i"), Files: paths("3", "5", "8")}, - {Span: sp("l", "m"), Files: paths("9")}, + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, }, cover) - coverSized, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, 2<<20, emptySpanFrontier, false) + coverSized, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, 2<<20, emptySpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "b"), Files: paths("1", "6")}, - {Span: sp("b", "c"), Files: paths("1", "4", "6")}, - {Span: sp("c", "f"), Files: paths("2", "4", "6")}, - {Span: sp("f", "h"), Files: paths("5", "6")}, - {Span: sp("h", "i"), Files: paths("3", "5", "8")}, - {Span: sp("l", "m"), Files: paths("9")}, + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, }, coverSized) // check that introduced spans are properly elided - backups[2].IntroducedSpans = []roachpb.Span{sp("a", "f")} + backups[2].IntroducedSpans = []roachpb.Span{c.sp("a", "f")} introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - coverIntroduced, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, noSpanTargetSize, introducedSpanFrontier, false) + coverIntroduced, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, noSpanTargetSize, introducedSpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "f"), Files: paths("6")}, - {Span: sp("f", "g"), Files: paths("6")}, - {Span: sp("g", "h"), Files: paths("5", "6")}, - {Span: sp("h", "i"), Files: paths("3", "5", "8")}, - {Span: sp("l", "m"), Files: paths("9")}, + {Span: c.sp("a", "f"), Files: c.paths("6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, }, coverIntroduced) +} + +func TestFileSpanStartKeyIterator(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + c := makeCoverUtils(ctx, t, &execCfg) + type testSpec struct { + manifestFiles []roachpb.Spans + keysSurfaced []string + expectedError string + } + + for _, sp := range []testSpec{ + { + // adjacent and disjoint files. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "b"), c.sp("c", "d"), c.sp("d", "e")}, + }, + keysSurfaced: []string{"a", "b", "c", "d", "e"}, + }, + { + // shadow start key (b) if another span covers it. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "c"), c.sp("b", "d")}, + }, + keysSurfaced: []string{"a", "c", "d"}, + }, + { + // swap the file order and expect an error. + manifestFiles: []roachpb.Spans{ + {c.sp("b", "d"), c.sp("a", "c")}, + }, + keysSurfaced: []string{"b", "d", "a", "c"}, + expectedError: "out of order backup keys", + }, + { + // overlapping files within a level. + manifestFiles: []roachpb.Spans{ + {c.sp("b", "f"), c.sp("c", "d"), c.sp("e", "g")}, + }, + keysSurfaced: []string{"b", "f", "g"}, + }, + { + // overlapping files within and across levels. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "e"), c.sp("d", "f")}, + {c.sp("b", "c")}, + }, + keysSurfaced: []string{"a", "b", "c", "e", "f"}, + }, + { + // overlapping start key in one level, but non overlapping in another level. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "c"), c.sp("b", "d")}, + {c.sp("b", "c")}, + }, + keysSurfaced: []string{"a", "b", "c", "d"}, + }, + { + // overlapping files in both levels. + manifestFiles: []roachpb.Spans{ + {c.sp("b", "e"), c.sp("d", "i")}, + {c.sp("a", "c"), c.sp("b", "h")}, + }, + keysSurfaced: []string{"a", "b", "c", "e", "h", "i"}, + }, + { + // ensure everything works with 3 layers. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "e"), c.sp("e", "f")}, + {c.sp("b", "e"), c.sp("e", "f")}, + {c.sp("c", "e"), c.sp("d", "f")}, + }, + keysSurfaced: []string{"a", "b", "c", "e", "f"}, + }, + } { + backups := c.makeManifests(sp.manifestFiles) + + // randomly shuffle the order of the manifests, as order should not matter. + for i := range backups { + j := rand.Intn(i + 1) + backups[i], backups[j] = backups[j], backups[i] + } + + // ensure all the expected keys are surfaced. + layerToBackupManifestFileIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, + backups, nil, nil) + require.NoError(t, err) + + sanityCheckFileIterator(ctx, t, layerToBackupManifestFileIterFactory[0], backups[0]) + + startEndKeyIt, err := newFileSpanStartAndEndKeyIterator(ctx, backups, layerToBackupManifestFileIterFactory) + require.NoError(t, err) + + for _, expectedKey := range sp.keysSurfaced { + if ok, err := startEndKeyIt.valid(); !ok { + if err != nil { + require.Error(t, err, sp.expectedError) + } + break + } + expected := roachpb.Key(expectedKey) + require.Equal(t, expected, startEndKeyIt.value()) + startEndKeyIt.next() + } + } } type mockBackupInfo struct { @@ -399,8 +559,13 @@ func createMockManifest( files = append(files, backuppb.BackupManifest_File{Span: sp, Path: path}) } + ctx := context.Background() + es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, + fmt.Sprintf("nodelocal://1/mock%s", timeutil.Now().String()), username.RootUserName()) + require.NoError(t, err) + return backuppb.BackupManifest{Spans: spans, - EndTime: endTime, Files: files} + EndTime: endTime, Files: files, Dir: es.Conf()} } // TestRestoreEntryCoverReIntroducedSpans checks that all reintroduced spans are @@ -545,10 +710,10 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(t, err) - cover, err := makeImportSpans(ctx, restoreSpans, backups, layerToBackupManifestFileIterFactory, + cover, err := makeImportSpans(ctx, restoreSpans, backups, layerToIterFactory, 0, introducedSpanFrontier, false) require.NoError(t, err) @@ -573,6 +738,31 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { } } +// sanityCheckFileIterator ensures the backup files are surfaced in the order they are stored in +// the manifest. +func sanityCheckFileIterator( + ctx context.Context, + t *testing.T, + iterFactory *backupinfo.IterFactory, + backup backuppb.BackupManifest, +) { + iter, err := iterFactory.NewFileIter(ctx) + require.NoError(t, err) + defer iter.Close() + + for _, expectedFile := range backup.Files { + if ok, err := iter.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + t.Fatalf("file iterator should have file with path %s", expectedFile.Path) + } + + file := iter.Value() + require.Equal(t, expectedFile, *file) + iter.Next() + } +} + func TestRestoreEntryCover(t *testing.T) { defer leaktest.AfterTest(t)() r, _ := randutil.NewTestRand() @@ -588,18 +778,19 @@ func TestRestoreEntryCover(t *testing.T) { for _, simpleImportSpans := range []bool{true, false} { backups, err := MockBackupChain(ctx, numBackups, spans, files, r, hasExternalFilesList, execCfg) require.NoError(t, err) - + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, + execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) + require.NoError(t, err) + randLayer := rand.Intn(len(backups)) + randBackup := backups[randLayer] + sanityCheckFileIterator(ctx, t, layerToIterFactory[randLayer], randBackup) for _, target := range []int64{0, 1, 4, 100, 1000} { t.Run(fmt.Sprintf("numBackups=%d, numSpans=%d, numFiles=%d, merge=%d, slim=%t, simple=%t", numBackups, spans, files, target, hasExternalFilesList, simpleImportSpans), func(t *testing.T) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, - execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) - require.NoError(t, err) cover, err := makeImportSpans(ctx, backups[numBackups-1].Spans, backups, - layerToBackupManifestFileIterFactory, target<<20, introducedSpanFrontier, simpleImportSpans) + layerToIterFactory, target<<20, introducedSpanFrontier, simpleImportSpans) require.NoError(t, err) require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index 9a67dd0cab77..1ff260f8b067 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -104,7 +104,7 @@ func (m manifestInfoReader) showBackup( // FKs for which we can't resolve the cross-table references. We can't // display them anyway, because we don't have the referenced table names, // etc. - err := maybeUpgradeDescriptorsInBackupManifests(info.manifests, + err := maybeUpgradeDescriptorsInBackupManifests(ctx, info.manifests, info.layerToIterFactory, true /* skipFKsWithNoMatchingTable */) if err != nil { return err @@ -444,6 +444,12 @@ you must pass the 'encryption_info_dir' parameter that points to the directory o return err } } + + info.layerToIterFactory, err = backupinfo.GetBackupManifestIterFactories(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, info.manifests, info.enc, info.kmsEnv) + if err != nil { + return err + } + // If backup is locality aware, check that user passed at least some localities. // TODO (msbutler): this is an extremely crude check that the user is @@ -572,13 +578,19 @@ func checkBackupFiles( // Check all backup SSTs. fileSizes := make([]int64, 0) - it, err := makeBackupManifestFileIterator(ctx, execCfg.DistSQLSrv.ExternalStorage, - info.manifests[layer], encryption, kmsEnv) + it, err := info.layerToIterFactory[layer].NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + f := it.Value() store := defaultStore uri := info.defaultURIs[layer] if _, ok := localityStores[f.LocalityKV]; ok { @@ -599,9 +611,6 @@ func checkBackupFiles( } fileSizes = append(fileSizes, sz) } - if it.err() != nil { - return nil, it.err() - } return fileSizes, nil } @@ -636,11 +645,14 @@ type backupInfo struct { collectionURI string defaultURIs []string manifests []backuppb.BackupManifest - subdir string - localityInfo []jobspb.RestoreDetails_BackupLocalityInfo - enc *jobspb.BackupEncryptionOptions - kmsEnv cloud.KMSEnv - fileSizes [][]int64 + // layerToIterFactory is a mapping from the index of the backup layer in + // manifests to its IterFactory. + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory + subdir string + localityInfo []jobspb.RestoreDetails_BackupLocalityInfo + enc *jobspb.BackupEncryptionOptions + kmsEnv cloud.KMSEnv + fileSizes [][]int64 } type backupShower struct { @@ -704,7 +716,7 @@ func backupShowerDefault(p sql.PlanHookState, showSchemas bool, opts tree.KVOpti var rows []tree.Datums for layer, manifest := range info.manifests { - descriptors, err := backupinfo.BackupManifestDescriptors(&manifest) + descriptors, err := backupinfo.BackupManifestDescriptors(ctx, info.layerToIterFactory[layer], manifest.EndTime) if err != nil { return nil, err } @@ -754,7 +766,7 @@ func backupShowerDefault(p sql.PlanHookState, showSchemas bool, opts tree.KVOpti fileSizes = info.fileSizes[layer] } - tableSizes, err := getTableSizes(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, info, manifest, fileSizes) + tableSizes, err := getTableSizes(ctx, info.layerToIterFactory[layer], fileSizes) if err != nil { return nil, err } @@ -927,27 +939,27 @@ type descriptorSize struct { // BackupManifest_File identifies a span in an SST and there can be multiple // spans stored in an SST. func getLogicalSSTSize( - ctx context.Context, - storeFactory cloud.ExternalStorageFactory, - manifest backuppb.BackupManifest, - enc *jobspb.BackupEncryptionOptions, - kmsEnv cloud.KMSEnv, + ctx context.Context, iterFactory *backupinfo.IterFactory, ) (map[string]int64, error) { ctx, span := tracing.ChildSpan(ctx, "backupccl.getLogicalSSTSize") defer span.Finish() sstDataSize := make(map[string]int64) - it, err := makeBackupManifestFileIterator(ctx, storeFactory, manifest, enc, kmsEnv) + it, err := iterFactory.NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + f := it.Value() sstDataSize[f.Path] += f.EntryCounts.DataSize } - if it.err() != nil { - return nil, it.err() - } return sstDataSize, nil } @@ -960,30 +972,33 @@ func approximateSpanPhysicalSize( // getTableSizes gathers row and size count for each table in the manifest func getTableSizes( - ctx context.Context, - storeFactory cloud.ExternalStorageFactory, - info backupInfo, - manifest backuppb.BackupManifest, - fileSizes []int64, + ctx context.Context, iterFactory *backupinfo.IterFactory, fileSizes []int64, ) (map[descpb.ID]descriptorSize, error) { ctx, span := tracing.ChildSpan(ctx, "backupccl.getTableSizes") defer span.Finish() - logicalSSTSize, err := getLogicalSSTSize(ctx, storeFactory, manifest, info.enc, info.kmsEnv) + logicalSSTSize, err := getLogicalSSTSize(ctx, iterFactory) if err != nil { return nil, err } - it, err := makeBackupManifestFileIterator(ctx, storeFactory, manifest, info.enc, info.kmsEnv) + it, err := iterFactory.NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() + defer it.Close() tableSizes := make(map[descpb.ID]descriptorSize) var tenantID roachpb.TenantID var showCodec keys.SQLCodec var idx int - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + f := it.Value() if !tenantID.IsSet() { var err error _, tenantID, err = keys.DecodeTenantPrefix(f.Span.Key) @@ -1015,9 +1030,6 @@ func getTableSizes( tableSizes[descpb.ID(tableID)] = s idx++ } - if it.err() != nil { - return nil, it.err() - } return tableSizes, nil } @@ -1129,7 +1141,7 @@ var backupShowerDoctor = backupShower{ var namespaceTable doctor.NamespaceTable // Extract all the descriptors from the given manifest and generate the // namespace and descriptor tables needed by doctor. - descriptors, _, err := backupinfo.LoadSQLDescsFromBackupsAtTime(info.manifests, hlc.Timestamp{}) + descriptors, _, err := backupinfo.LoadSQLDescsFromBackupsAtTime(ctx, info.manifests, info.layerToIterFactory, hlc.Timestamp{}) if err != nil { return nil, err } @@ -1219,20 +1231,24 @@ func backupShowerFileSetup( backupType = "incremental" } - logicalSSTSize, err := getLogicalSSTSize(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, manifest, - info.enc, info.kmsEnv) + logicalSSTSize, err := getLogicalSSTSize(ctx, info.layerToIterFactory[i]) if err != nil { return nil, err } - it, err := makeBackupManifestFileIterator(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, - manifest, info.enc, info.kmsEnv) + it, err := info.layerToIterFactory[i].NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() + defer it.Close() var idx int - for file, hasNext := it.next(); hasNext; file, hasNext = it.next() { + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + file := it.Value() filePath := file.Path if inCol != nil { filePath = path.Join(manifestDirs[i], filePath) @@ -1263,9 +1279,6 @@ func backupShowerFileSetup( }) idx++ } - if it.err() != nil { - return nil, it.err() - } } return rows, nil }, diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index ec0e189b7c23..f577d8190451 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -348,8 +348,10 @@ func fullClusterTargetsBackup( // mainBackupManifests are sorted by Endtime and this check only applies to // backups with a start time that is less than the restore AOST. func checkMissingIntroducedSpans( + ctx context.Context, restoringDescs []catalog.Descriptor, mainBackupManifests []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, endTime hlc.Timestamp, codec keys.SQLCodec, ) error { @@ -374,12 +376,31 @@ func checkMissingIntroducedSpans( // Gather the _online_ tables included in the previous backup. prevOnlineTables := make(map[descpb.ID]struct{}) - for _, desc := range mainBackupManifests[i-1].Descriptors { - if table, _, _, _, _ := descpb.GetDescriptors(&desc); table != nil && table.Public() { + prevDescIt := layerToIterFactory[i-1].NewDescIter(ctx) + defer prevDescIt.Close() + for ; ; prevDescIt.Next() { + if ok, err := prevDescIt.Valid(); err != nil { + return err + } else if !ok { + break + } + + if table, _, _, _, _ := descpb.GetDescriptors(prevDescIt.Value()); table != nil && table.Public() { prevOnlineTables[table.GetID()] = struct{}{} } } + prevDescRevIt := layerToIterFactory[i-1].NewDescriptorChangesIter(ctx) + defer prevDescRevIt.Close() + for ; ; prevDescRevIt.Next() { + if ok, err := prevDescRevIt.Valid(); err != nil { + return err + } else if !ok { + break + } + + } + // Gather the tables that were reintroduced in the current backup (i.e. // backed up from ts=0). tablesIntroduced := make(map[descpb.ID]struct{}) @@ -430,10 +451,19 @@ that was running an IMPORT at the time of the previous incremental in this chain }) } - for _, desc := range mainBackupManifests[i].Descriptors { + descIt := layerToIterFactory[i].NewDescIter(ctx) + defer descIt.Close() + + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return err + } else if !ok { + break + } + // Check that all online tables at backup time were either introduced or // in the previous backup. - if table, _, _, _, _ := descpb.GetDescriptors(&desc); table != nil && table.Public() { + if table, _, _, _, _ := descpb.GetDescriptors(descIt.Value()); table != nil && table.Public() { if err := requiredIntroduction(table); err != nil { return err } @@ -444,8 +474,17 @@ that was running an IMPORT at the time of the previous incremental in this chain // where a descriptor may appear in manifest.DescriptorChanges but not // manifest.Descriptors. If a descriptor switched from offline to online at // any moment during the backup interval, it needs to be reintroduced. - for _, desc := range mainBackupManifests[i].DescriptorChanges { - if table, _, _, _, _ := descpb.GetDescriptors(desc.Desc); table != nil && table.Public() { + descRevIt := layerToIterFactory[i].NewDescriptorChangesIter(ctx) + defer descRevIt.Close() + + for ; ; descRevIt.Next() { + if ok, err := descRevIt.Valid(); err != nil { + return err + } else if !ok { + break + } + + if table, _, _, _, _ := descpb.GetDescriptors(descRevIt.Value().Desc); table != nil && table.Public() { if err := requiredIntroduction(table); err != nil { return err } @@ -470,6 +509,7 @@ func selectTargets( ctx context.Context, p sql.PlanHookState, backupManifests []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, targets tree.BackupTargetList, descriptorCoverage tree.DescriptorCoverage, asOf hlc.Timestamp, @@ -482,7 +522,7 @@ func selectTargets( ) { ctx, span := tracing.ChildSpan(ctx, "backupccl.selectTargets") defer span.Finish() - allDescs, lastBackupManifest, err := backupinfo.LoadSQLDescsFromBackupsAtTime(backupManifests, asOf) + allDescs, lastBackupManifest, err := backupinfo.LoadSQLDescsFromBackupsAtTime(ctx, backupManifests, layerToIterFactory, asOf) if err != nil { return nil, nil, nil, nil, err } diff --git a/pkg/sql/execinfrapb/api.go b/pkg/sql/execinfrapb/api.go index ecd89228bf4f..b755df7844a5 100644 --- a/pkg/sql/execinfrapb/api.go +++ b/pkg/sql/execinfrapb/api.go @@ -87,6 +87,7 @@ func (m *ChangeFrontierSpec) User() username.SQLUsername { return m.UserProto.Decode() } +// User accesses the user field. func (m *GenerativeSplitAndScatterSpec) User() username.SQLUsername { return m.UserProto.Decode() } diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel index a8ccf07aa3af..ba065f15f790 100644 --- a/pkg/util/bulk/BUILD.bazel +++ b/pkg/util/bulk/BUILD.bazel @@ -3,7 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "bulk", - srcs = ["tracing_aggregator.go"], + srcs = [ + "iterator.go", + "tracing_aggregator.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", visibility = ["//visibility:public"], deps = [ diff --git a/pkg/util/bulk/iterator.go b/pkg/util/bulk/iterator.go new file mode 100644 index 000000000000..57f7b09dced9 --- /dev/null +++ b/pkg/util/bulk/iterator.go @@ -0,0 +1,46 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +// Iterator is an interface to iterate a collection of objects of type T. +type Iterator[T any] interface { + // Valid must be called after any call to Next(). It returns (true, nil) if + // the iterator points to a valid value, and (false, nil) if the iterator has + // moved past the last value. It returns (false, err) if there is an error in + // the iterator. + Valid() (bool, error) + + // Value returns the current value. The returned value is only valid until the + // next call to Next(). is only valid until the + Value() T + + // Next advances the iterator to the next value. + Next() + + // Close closes the iterator. + Close() +} + +// CollectToSlice iterates over all objects in iterator and collects them into a +// slice. +func CollectToSlice[T any](iterator Iterator[T]) ([]T, error) { + var values []T + for ; ; iterator.Next() { + if ok, err := iterator.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + values = append(values, iterator.Value()) + } + return values, nil +}