diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 178f54abe4c6..45527c8563e5 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -266,6 +266,7 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", + "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 4f04d3e4483e..bffabd0ded95 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -146,22 +146,26 @@ func backup( // TODO(benesch): verify these files, rather than accepting them as truth // blindly. // No concurrency yet, so these assignments are safe. - it, err := makeBackupManifestFileIterator(ctx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, - *backupManifest, encryption, &kmsEnv) + iterFactory := backupinfo.NewIterFactory(backupManifest, defaultStore, encryption, &kmsEnv) + it, err := iterFactory.NewFileIter(ctx) if err != nil { return roachpb.RowCount{}, err } - defer it.close() - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return roachpb.RowCount{}, err + } else if !ok { + break + } + + f := it.Value() if f.StartTime.IsEmpty() && !f.EndTime.IsEmpty() { completedIntroducedSpans = append(completedIntroducedSpans, f.Span) } else { completedSpans = append(completedSpans, f.Span) } } - if it.err() != nil { - return roachpb.RowCount{}, it.err() - } // Subtract out any completed spans. spans := filterSpans(backupManifest.Spans, completedSpans) @@ -340,13 +344,11 @@ func backup( // Write a `BACKUP_METADATA` file along with SSTs for all the alloc heavy // fields elided from the `BACKUP_MANIFEST`. - // - // TODO(adityamaru,rhu713): Once backup/restore switches from writing and - // reading backup manifests to `metadata.sst` we can stop writing the slim - // manifest. - if err := backupinfo.WriteFilesListMetadataWithSSTs(ctx, defaultStore, encryption, - &kmsEnv, backupManifest); err != nil { - return roachpb.RowCount{}, err + if backupinfo.WriteMetadataWithFilesSST.Get(&settings.SV) { + if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, defaultStore, encryption, + &kmsEnv, backupManifest); err != nil { + return roachpb.RowCount{}, err + } } var tableStatistics []*stats.TableStatisticProto @@ -921,12 +923,19 @@ func getBackupDetailAndManifest( return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err } + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, prevBackups, baseEncryptionOptions, &kmsEnv) + if err != nil { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err + } + backupManifest, err := createBackupManifest( ctx, execCfg, txn, updatedDetails, - prevBackups) + prevBackups, + layerToIterFactory, + ) if err != nil { return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err } diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backup_metadata_test.go index 60f736c85360..b15df4d52e69 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backup_metadata_test.go @@ -26,8 +26,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" - "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -36,6 +36,8 @@ import ( "github.com/stretchr/testify/require" ) +// TestMetadataSST has to be in backupccl_test in order to be sure that the +// BACKUP planhook is registered. func TestMetadataSST(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -112,14 +114,8 @@ func checkMetadata( } checkManifest(t, m, bm) - // If there are descriptor changes, we only check those as they should have - // all changes as well as existing descriptors - if len(m.DescriptorChanges) > 0 { - checkDescriptorChanges(ctx, t, m, bm) - } else { - checkDescriptors(ctx, t, m, bm) - } - + checkDescriptorChanges(ctx, t, m, bm) + checkDescriptors(ctx, t, m, bm) checkSpans(ctx, t, m, bm) // Don't check introduced spans on the first backup. if m.StartTime != (hlc.Timestamp{}) { @@ -147,16 +143,17 @@ func checkDescriptors( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaDescs []descpb.Descriptor - var desc descpb.Descriptor - it := bm.DescIter(ctx) + it := bm.NewDescIter(ctx) defer it.Close() - for it.Next(&desc) { - metaDescs = append(metaDescs, desc) - } + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } - if it.Err() != nil { - t.Fatal(it.Err()) + metaDescs = append(metaDescs, *it.Value()) } require.Equal(t, m.Descriptors, metaDescs) @@ -166,15 +163,16 @@ func checkDescriptorChanges( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaRevs []backuppb.BackupManifest_DescriptorRevision - var rev backuppb.BackupManifest_DescriptorRevision - it := bm.DescriptorChangesIter(ctx) + it := bm.NewDescriptorChangesIter(ctx) defer it.Close() - for it.Next(&rev) { - metaRevs = append(metaRevs, rev) - } - if it.Err() != nil { - t.Fatal(it.Err()) + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + metaRevs = append(metaRevs, *it.Value()) } // Descriptor Changes are sorted by time in the manifest. @@ -214,15 +212,17 @@ func checkSpans( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaSpans []roachpb.Span - var span roachpb.Span - it := bm.SpanIter(ctx) + it := bm.NewSpanIter(ctx) defer it.Close() - for it.Next(&span) { - metaSpans = append(metaSpans, span) - } - if it.Err() != nil { - t.Fatal(it.Err()) + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + metaSpans = append(metaSpans, it.Value()) } require.Equal(t, m.Spans, metaSpans) @@ -232,14 +232,16 @@ func checkIntroducedSpans( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaSpans []roachpb.Span - var span roachpb.Span - it := bm.IntroducedSpanIter(ctx) + it := bm.NewIntroducedSpanIter(ctx) defer it.Close() - for it.Next(&span) { - metaSpans = append(metaSpans, span) - } - if it.Err() != nil { - t.Fatal(it.Err()) + + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + metaSpans = append(metaSpans, it.Value()) } require.Equal(t, m.IntroducedSpans, metaSpans) @@ -249,15 +251,17 @@ func checkTenants( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaTenants []descpb.TenantInfoWithUsage - var tenant descpb.TenantInfoWithUsage - it := bm.TenantIter(ctx) + it := bm.NewTenantIter(ctx) defer it.Close() - for it.Next(&tenant) { - metaTenants = append(metaTenants, tenant) - } - if it.Err() != nil { - t.Fatal(it.Err()) + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + metaTenants = append(metaTenants, it.Value()) } require.Equal(t, m.Tenants, metaTenants) @@ -275,18 +279,17 @@ func checkStats( if err != nil { t.Fatal(err) } + if len(expectedStats) == 0 { + expectedStats = nil + } - var metaStats = make([]*stats.TableStatisticProto, 0) - var s *stats.TableStatisticProto - it := bm.StatsIter(ctx) + it := bm.NewStatsIter(ctx) defer it.Close() - - for it.Next(&s) { - metaStats = append(metaStats, s) - } - if it.Err() != nil { - t.Fatal(it.Err()) + metaStats, err := bulk.CollectToSlice(it) + if err != nil { + t.Fatal(err) } + require.Equal(t, expectedStats, metaStats) } diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 3f1161a8efa0..33041ec6944e 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1053,6 +1053,7 @@ func getReintroducedSpans( ctx context.Context, execCfg *sql.ExecutorConfig, prevBackups []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, tables []catalog.TableDescriptor, revs []backuppb.BackupManifest_DescriptorRevision, endTime hlc.Timestamp, @@ -1078,12 +1079,21 @@ func getReintroducedSpans( // at backup time, we must find all tables in manifest.DescriptorChanges whose // last change brought the table offline. offlineInLastBackup := make(map[descpb.ID]struct{}) - lastBackup := prevBackups[len(prevBackups)-1] + lastIterFactory := layerToIterFactory[len(prevBackups)-1] + + descIt := lastIterFactory.NewDescIter(ctx) + defer descIt.Close() + + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return nil, err + } else if !ok { + break + } - for _, desc := range lastBackup.Descriptors { // TODO(pbardea): Also check that lastWriteTime is set once those are // populated on the table descriptor. - if table, _, _, _, _ := descpb.GetDescriptors(&desc); table != nil && table.Offline() { + if table, _, _, _, _ := descpb.GetDescriptors(descIt.Value()); table != nil && table.Offline() { offlineInLastBackup[table.GetID()] = struct{}{} } } @@ -1093,8 +1103,16 @@ func getReintroducedSpans( // change in the previous backup interval put the table offline, then that // backup was offline at the endTime of the last backup. latestTableDescChangeInLastBackup := make(map[descpb.ID]*descpb.TableDescriptor) - for _, rev := range lastBackup.DescriptorChanges { - if table, _, _, _, _ := descpb.GetDescriptors(rev.Desc); table != nil { + descRevIt := lastIterFactory.NewDescriptorChangesIter(ctx) + defer descRevIt.Close() + for ; ; descRevIt.Next() { + if ok, err := descRevIt.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + if table, _, _, _, _ := descpb.GetDescriptors(descRevIt.Value().Desc); table != nil { if trackedRev, ok := latestTableDescChangeInLastBackup[table.GetID()]; !ok { latestTableDescChangeInLastBackup[table.GetID()] = table } else if trackedRev.Version < table.Version { @@ -1341,6 +1359,7 @@ func createBackupManifest( txn *kv.Txn, jobDetails jobspb.BackupDetails, prevBackups []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, ) (backuppb.BackupManifest, error) { mvccFilter := backuppb.MVCCFilter_Latest if jobDetails.RevisionHistory { @@ -1414,9 +1433,17 @@ func createBackupManifest( if len(prevBackups) > 0 { tablesInPrev := make(map[descpb.ID]struct{}) dbsInPrev := make(map[descpb.ID]struct{}) - rawDescs := prevBackups[len(prevBackups)-1].Descriptors - for i := range rawDescs { - if t, _, _, _, _ := descpb.GetDescriptors(&rawDescs[i]); t != nil { + + descIt := layerToIterFactory[len(prevBackups)-1].NewDescIter(ctx) + defer descIt.Close() + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return backuppb.BackupManifest{}, err + } else if !ok { + break + } + + if t, _, _, _, _ := descpb.GetDescriptors(descIt.Value()); t != nil { tablesInPrev[t.ID] = struct{}{} } } @@ -1437,7 +1464,7 @@ func createBackupManifest( newSpans = filterSpans(spans, prevBackups[len(prevBackups)-1].Spans) - reintroducedSpans, err = getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime) + reintroducedSpans, err = getReintroducedSpans(ctx, execCfg, prevBackups, layerToIterFactory, tables, revs, endTime) if err != nil { return backuppb.BackupManifest{}, err } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 6fa6318d3a6a..be7ffd603db0 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -8132,8 +8132,8 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) { } // TestIncorrectAccessOfFilesInBackupMetadata ensures that an accidental use of -// the `Files` field (instead of its dedicated SST) on the `BACKUP_METADATA` -// results in an error on restore and show. +// the `Descriptors` field (instead of its dedicated SST) on the +// `BACKUP_METADATA` results in an error on restore and show. func TestIncorrectAccessOfFilesInBackupMetadata(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -8158,13 +8158,13 @@ func TestIncorrectAccessOfFilesInBackupMetadata(t *testing.T) { var backupManifest backuppb.BackupManifest require.NoError(t, protoutil.Unmarshal(manifestData, &backupManifest)) - // The manifest should have `HasExternalFilesList` set to true. - require.True(t, backupManifest.HasExternalFilesList) + // The manifest should have `HasExternalManifestSSTs` set to true. + require.True(t, backupManifest.HasExternalManifestSSTs) // Set it to false, so that any operation that resolves the metadata treats // this manifest as a pre-23.1 BACKUP_MANIFEST, and directly accesses the - // `Files` field, instead of reading from the external SST. - backupManifest.HasExternalFilesList = false + // `Descriptors` field, instead of reading from the external SST. + backupManifest.HasExternalManifestSSTs = false manifestData, err = protoutil.Marshal(&backupManifest) require.NoError(t, err) require.NoError(t, os.WriteFile(manifestPath, manifestData, 0644 /* perm */)) @@ -8174,7 +8174,7 @@ func TestIncorrectAccessOfFilesInBackupMetadata(t *testing.T) { require.NoError(t, os.WriteFile(manifestPath+backupinfo.BackupManifestChecksumSuffix, checksum, 0644 /* perm */)) // Expect an error on restore. - sqlDB.ExpectErr(t, "assertion: this placeholder legacy Files entry should never be opened", `RESTORE DATABASE r1 FROM LATEST IN 'nodelocal://0/test' WITH new_db_name = 'r2'`) + sqlDB.ExpectErr(t, "assertion: this placeholder legacy Descriptor entry should never be used", `RESTORE DATABASE r1 FROM LATEST IN 'nodelocal://0/test' WITH new_db_name = 'r2'`) } func TestManifestTooNew(t *testing.T) { @@ -8285,7 +8285,7 @@ func flipBitInManifests(t *testing.T, rawDir string) { foundManifest := false err := filepath.Walk(rawDir, func(path string, info os.FileInfo, err error) error { log.Infof(context.Background(), "visiting %s", path) - if filepath.Base(path) == backupbase.BackupMetadataName { + if filepath.Base(path) == backupbase.BackupManifestName { foundManifest = true data, err := os.ReadFile(path) require.NoError(t, err) diff --git a/pkg/ccl/backupccl/backupinfo/BUILD.bazel b/pkg/ccl/backupccl/backupinfo/BUILD.bazel index 2bf6b1cb2bd6..c642036cfba7 100644 --- a/pkg/ccl/backupccl/backupinfo/BUILD.bazel +++ b/pkg/ccl/backupccl/backupinfo/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//pkg/sql/stats", "//pkg/storage", "//pkg/util", + "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", @@ -55,17 +56,33 @@ go_library( go_test( name = "backupinfo_test", - srcs = ["main_test.go"], + srcs = [ + "main_test.go", + "manifest_handling_test.go", + ], args = ["-test.timeout=295s"], embed = [":backupinfo"], deps = [ + "//pkg/base", + "//pkg/blobs", + "//pkg/ccl/backupccl/backuppb", "//pkg/ccl/utilccl", + "//pkg/cloud", + "//pkg/cloud/impl:cloudimpl", + "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", + "//pkg/sql", + "//pkg/sql/catalog/descpb", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", + "//pkg/util/bulk", + "//pkg/util/hlc", + "//pkg/util/leaktest", "//pkg/util/randutil", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index 9b45d71ab3bb..a6a136e5d933 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -41,6 +42,11 @@ const ( // BackupManifest_Files of the backup. This file is always written in // conjunction with the `BACKUP_METADATA`. BackupMetadataFilesListPath = "filelist.sst" + // BackupMetadataDescriptorsListPath is the name of the SST file containing + // the BackupManifest_Descriptors or BackupManifest_DescriptorRevisions of the + // backup. This file is always written in conjunction with the + // `BACKUP_METADATA`. + BackupMetadataDescriptorsListPath = "descriptorslist.sst" // FileInfoPath is the name of the SST file containing the // BackupManifest_Files of the backup. FileInfoPath = "fileinfo.sst" @@ -200,18 +206,25 @@ func writeManifestToMetadata( return sst.PutUnversioned(roachpb.Key(sstBackupKey), b) } +// DescChangesLess gives an ordering to two BackupManifest_DescriptorRevision. +func DescChangesLess( + left *backuppb.BackupManifest_DescriptorRevision, + right *backuppb.BackupManifest_DescriptorRevision, +) bool { + if left.ID != right.ID { + return left.ID < right.ID + } + + return !left.Time.Less(right.Time) +} + func writeDescsToMetadata( ctx context.Context, sst storage.SSTWriter, m *backuppb.BackupManifest, ) error { // Add descriptors from revisions if available, Descriptors if not. if len(m.DescriptorChanges) > 0 { sort.Slice(m.DescriptorChanges, func(i, j int) bool { - if m.DescriptorChanges[i].ID < m.DescriptorChanges[j].ID { - return true - } else if m.DescriptorChanges[i].ID == m.DescriptorChanges[j].ID { - return !m.DescriptorChanges[i].Time.Less(m.DescriptorChanges[j].Time) - } - return false + return DescChangesLess(&m.DescriptorChanges[i], &m.DescriptorChanges[j]) }) for _, i := range m.DescriptorChanges { k := encodeDescSSTKey(i.ID) @@ -248,7 +261,7 @@ func writeDescsToMetadata( // changes in an incremental backup, it's helpful to have existing // descriptors at the start time, so we don't have to look back further // than the very last backup. - if m.StartTime.IsEmpty() { + if m.StartTime.IsEmpty() || m.MVCCFilter == backuppb.MVCCFilter_Latest { if err := sst.PutUnversioned(k, b); err != nil { return err } @@ -262,6 +275,39 @@ func writeDescsToMetadata( return nil } +// WriteDescsSST is responsible for writing the SST containing the Descriptor +// and DescriptorChanges field of the input BackupManifest. If DescriptorChanges +// is non-empty, then the descriptor changes will be written to the SST with the +// MVCC timestamp equal to the revision time. Otherwise, contents of the +// Descriptors field will be written to the SST with an empty MVCC timestamp. +func WriteDescsSST( + ctx context.Context, + m *backuppb.BackupManifest, + dest cloud.ExternalStorage, + enc *jobspb.BackupEncryptionOptions, + kmsEnv cloud.KMSEnv, + path string, +) error { + w, err := makeWriter(ctx, dest, path, enc, kmsEnv) + if err != nil { + return err + } + defer w.Close() + descSST := storage.MakeBackupSSTWriter(ctx, dest.Settings(), w) + defer descSST.Close() + + if err := writeDescsToMetadata(ctx, descSST, m); err != nil { + return err + } + + if err := descSST.Finish(); err != nil { + return err + } + + return w.Close() +} + +// FileCmp gives an ordering to two backuppb.BackupManifest_File. func FileCmp(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) int { if cmp := left.Span.Key.Compare(right.Span.Key); cmp != 0 { return cmp @@ -931,29 +977,34 @@ func NewBackupMetadata( type SpanIterator struct { backing bytesIter filter func(key storage.MVCCKey) bool + value *roachpb.Span err error } -// SpanIter creates a new SpanIterator for the backup metadata. -func (b *BackupMetadata) SpanIter(ctx context.Context) SpanIterator { +// NewSpanIter creates a new SpanIterator for the backup metadata. +func (b *BackupMetadata) NewSpanIter(ctx context.Context) bulk.Iterator[roachpb.Span] { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstSpansPrefix), b.enc, true, b.kmsEnv) - return SpanIterator{ + it := SpanIterator{ backing: backing, } + it.Next() + return &it } -// IntroducedSpanIter creates a new IntroducedSpanIterator for the backup metadata. -func (b *BackupMetadata) IntroducedSpanIter(ctx context.Context) SpanIterator { +// NewIntroducedSpanIter creates a new IntroducedSpanIterator for the backup metadata. +func (b *BackupMetadata) NewIntroducedSpanIter(ctx context.Context) bulk.Iterator[roachpb.Span] { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstSpansPrefix), b.enc, false, b.kmsEnv) - return SpanIterator{ + it := SpanIterator{ backing: backing, filter: func(key storage.MVCCKey) bool { return key.Timestamp == hlc.Timestamp{} }, } + it.Next() + return &it } // Close closes the iterator. @@ -961,37 +1012,41 @@ func (si *SpanIterator) Close() { si.backing.close() } -// Err returns the iterator's error -func (si *SpanIterator) Err() error { +// Valid implements the Iterator interface. +func (si *SpanIterator) Valid() (bool, error) { if si.err != nil { - return si.err + return false, si.err } - return si.backing.err() + return si.value != nil, si.err } -// Next retrieves the next span in the iterator. -// -// Next returns true if next element was successfully unmarshalled into span, -// and false if there are no more elements or if an error was encountered. When -// Next returns false, the user should call the Err method to verify the -// existence of an error. -func (si *SpanIterator) Next(span *roachpb.Span) bool { +// Value implements the Iterator interface. +func (si *SpanIterator) Value() roachpb.Span { + if si.value == nil { + return roachpb.Span{} + } + return *si.value +} + +// Next implements the Iterator interface. +func (si *SpanIterator) Next() { wrapper := resultWrapper{} + var nextSpan *roachpb.Span for si.backing.next(&wrapper) { if si.filter == nil || si.filter(wrapper.key) { sp, err := decodeSpanSSTKey(wrapper.key.Key) if err != nil { si.err = err - return false + return } - *span = sp - return true + nextSpan = &sp + break } } - return false + si.value = nextSpan } // FileIterator is a simple iterator to iterate over backuppb.BackupManifest_File. @@ -1002,7 +1057,9 @@ type FileIterator struct { } // NewFileIter creates a new FileIterator for the backup metadata. -func (b *BackupMetadata) NewFileIter(ctx context.Context) (*FileIterator, error) { +func (b *BackupMetadata) NewFileIter( + ctx context.Context, +) (bulk.Iterator[*backuppb.BackupManifest_File], error) { fileInfoIter := makeBytesIter(ctx, b.store, b.filename, []byte(sstFilesPrefix), b.enc, false, b.kmsEnv) defer fileInfoIter.close() @@ -1030,13 +1087,7 @@ func (b *BackupMetadata) NewFileIter(ctx context.Context) (*FileIterator, error) if fileInfoIter.err() != nil { return nil, fileInfoIter.err() } - - iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, encOpts, iterOpts) - if err != nil { - return nil, err - } - iter.SeekGE(storage.MVCCKey{}) - return &FileIterator{mergedIterator: iter}, nil + return newFileSSTIter(ctx, storeFiles, encOpts) } // NewFileSSTIter creates a new FileIterator to iterate over the storeFile. @@ -1044,12 +1095,20 @@ func (b *BackupMetadata) NewFileIter(ctx context.Context) (*FileIterator, error) func NewFileSSTIter( ctx context.Context, storeFile storageccl.StoreFile, encOpts *roachpb.FileEncryptionOptions, ) (*FileIterator, error) { - iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{storeFile}, encOpts, iterOpts) + return newFileSSTIter(ctx, []storageccl.StoreFile{storeFile}, encOpts) +} + +func newFileSSTIter( + ctx context.Context, storeFiles []storageccl.StoreFile, encOpts *roachpb.FileEncryptionOptions, +) (*FileIterator, error) { + iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, encOpts, iterOpts) if err != nil { return nil, err } iter.SeekGE(storage.MVCCKey{}) - return &FileIterator{mergedIterator: iter}, nil + fi := &FileIterator{mergedIterator: iter} + fi.Next() + return fi, nil } // Close closes the iterator. @@ -1063,55 +1122,55 @@ func (fi *FileIterator) Valid() (bool, error) { return false, fi.err } - if ok, err := fi.mergedIterator.Valid(); !ok { - fi.err = err - return ok, err + return fi.file != nil, nil +} + +// Value implements the Iterator interface. +func (fi *FileIterator) Value() *backuppb.BackupManifest_File { + return fi.file +} + +// Next implements the Iterator interface. +func (fi *FileIterator) Next() { + if fi.err != nil { + return } - if fi.file == nil { - v := fi.mergedIterator.UnsafeValue() - file := &backuppb.BackupManifest_File{} - err := protoutil.Unmarshal(v, file) + if ok, err := fi.mergedIterator.Valid(); !ok { if err != nil { fi.err = err - return false, fi.err } - fi.file = file + fi.file = nil + return } - return true, nil -} -// Value returns the current value of the iterator, if valid. -func (fi *FileIterator) Value() *backuppb.BackupManifest_File { - return fi.file -} + v := fi.mergedIterator.UnsafeValue() + file := &backuppb.BackupManifest_File{} + err := protoutil.Unmarshal(v, file) + if err != nil { + fi.err = err + return + } -// Next advances the iterator the the next value. -func (fi *FileIterator) Next() { + fi.file = file fi.mergedIterator.Next() - fi.file = nil -} - -// Reset resets the iterator to the first value. -func (fi *FileIterator) Reset() { - fi.mergedIterator.SeekGE(storage.MVCCKey{}) - fi.err = nil - fi.file = nil } // DescIterator is a simple iterator to iterate over descpb.Descriptors. type DescIterator struct { backing bytesIter + value *descpb.Descriptor err error } -// DescIter creates a new DescIterator for the backup metadata. -func (b *BackupMetadata) DescIter(ctx context.Context) DescIterator { - backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstDescsPrefix), b.enc, - true, b.kmsEnv) - return DescIterator{ +// NewDescIter creates a new DescIterator for the backup metadata. +func (b *BackupMetadata) NewDescIter(ctx context.Context) bulk.Iterator[*descpb.Descriptor] { + backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstDescsPrefix), b.enc, true, b.kmsEnv) + it := DescIterator{ backing: backing, } + it.Next() + return &it } // Close closes the iterator. @@ -1119,52 +1178,63 @@ func (di *DescIterator) Close() { di.backing.close() } -// Err returns the iterator's error. -func (di *DescIterator) Err() error { +// Valid implements the Iterator interface. +func (di *DescIterator) Valid() (bool, error) { if di.err != nil { - return di.err + return false, di.err } - return di.backing.err() + return di.value != nil, nil } -// Next retrieves the next descriptor in the iterator. -// -// Next returns true if next element was successfully unmarshalled into desc , -// and false if there are no more elements or if an error was encountered. When -// Next returns false, the user should call the Err method to verify the -// existence of an error. -func (di *DescIterator) Next(desc *descpb.Descriptor) bool { - wrapper := resultWrapper{} +// Value implements the Iterator interface. +func (di *DescIterator) Value() *descpb.Descriptor { + return di.value +} + +// Next implements the Iterator interface. +func (di *DescIterator) Next() { + if di.err != nil { + return + } + wrapper := resultWrapper{} + var nextValue *descpb.Descriptor + descHolder := descpb.Descriptor{} for di.backing.next(&wrapper) { - err := protoutil.Unmarshal(wrapper.value, desc) + err := protoutil.Unmarshal(wrapper.value, &descHolder) if err != nil { di.err = err - return false + return } - tbl, db, typ, sc, fn := descpb.GetDescriptors(desc) + tbl, db, typ, sc, fn := descpb.GetDescriptors(&descHolder) if tbl != nil || db != nil || typ != nil || sc != nil || fn != nil { - return true + nextValue = &descHolder + break } } - return false + di.value = nextValue } // TenantIterator is a simple iterator to iterate over TenantInfoWithUsages. type TenantIterator struct { backing bytesIter + value *descpb.TenantInfoWithUsage err error } -// TenantIter creates a new TenantIterator for the backup metadata. -func (b *BackupMetadata) TenantIter(ctx context.Context) TenantIterator { +// NewTenantIter creates a new TenantIterator for the backup metadata. +func (b *BackupMetadata) NewTenantIter( + ctx context.Context, +) bulk.Iterator[descpb.TenantInfoWithUsage] { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstTenantsPrefix), b.enc, false, b.kmsEnv) - return TenantIterator{ + it := TenantIterator{ backing: backing, } + it.Next() + return &it } // Close closes the iterator. @@ -1172,62 +1242,91 @@ func (ti *TenantIterator) Close() { ti.backing.close() } -// Err returns the iterator's error. -func (ti *TenantIterator) Err() error { +// Valid implements the Iterator interface. +func (ti *TenantIterator) Valid() (bool, error) { if ti.err != nil { - return ti.err + return false, ti.err } - return ti.backing.err() + return ti.value != nil, nil } -// Next retrieves the next tenant in the iterator. -// -// Next returns true if next element was successfully unmarshalled into tenant, -// and false if there are no more elements or if an error was encountered. When -// Next returns false, the user should call the Err method to verify the -// existence of an error. -func (ti *TenantIterator) Next(tenant *descpb.TenantInfoWithUsage) bool { +// Value implements the Iterator interface. +func (ti *TenantIterator) Value() descpb.TenantInfoWithUsage { + if ti.value == nil { + return descpb.TenantInfoWithUsage{} + } + return *ti.value +} + +// Next implements the Iterator interface. +func (ti *TenantIterator) Next() { + if ti.err != nil { + return + } + wrapper := resultWrapper{} ok := ti.backing.next(&wrapper) if !ok { - return false + if ti.backing.err() != nil { + ti.err = ti.backing.err() + } + ti.value = nil + return } - err := protoutil.Unmarshal(wrapper.value, tenant) + tenant := descpb.TenantInfoWithUsage{} + + err := protoutil.Unmarshal(wrapper.value, &tenant) if err != nil { ti.err = err - return false + return } - return true + ti.value = &tenant } // DescriptorRevisionIterator is a simple iterator to iterate over backuppb.BackupManifest_DescriptorRevisions. type DescriptorRevisionIterator struct { backing bytesIter err error + value *backuppb.BackupManifest_DescriptorRevision } -// DescriptorChangesIter creates a new DescriptorChangesIterator for the backup metadata. -func (b *BackupMetadata) DescriptorChangesIter(ctx context.Context) DescriptorRevisionIterator { +// NewDescriptorChangesIter creates a new DescriptorChangesIterator for the backup metadata. +func (b *BackupMetadata) NewDescriptorChangesIter( + ctx context.Context, +) bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] { + if b.MVCCFilter == backuppb.MVCCFilter_Latest { + var backing []backuppb.BackupManifest_DescriptorRevision + return newSlicePointerIterator(backing) + } + backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstDescsPrefix), b.enc, false, b.kmsEnv) - return DescriptorRevisionIterator{ + dri := DescriptorRevisionIterator{ backing: backing, } -} -// Close closes the iterator. -func (dri *DescriptorRevisionIterator) Close() { - dri.backing.close() + dri.Next() + return &dri } -// Err returns the iterator's error. -func (dri *DescriptorRevisionIterator) Err() error { +// Valid implements the Iterator interface. +func (dri *DescriptorRevisionIterator) Valid() (bool, error) { if dri.err != nil { - return dri.err + return false, dri.err } - return dri.backing.err() + return dri.value != nil, nil +} + +// Value implements the Iterator interface. +func (dri *DescriptorRevisionIterator) Value() *backuppb.BackupManifest_DescriptorRevision { + return dri.value +} + +// Close closes the iterator. +func (dri *DescriptorRevisionIterator) Close() { + dri.backing.close() } // Next retrieves the next descriptor revision in the iterator. @@ -1236,62 +1335,72 @@ func (dri *DescriptorRevisionIterator) Err() error { // revision, and false if there are no more elements or if an error was // encountered. When Next returns false, the user should call the Err method to // verify the existence of an error. -func (dri *DescriptorRevisionIterator) Next( - revision *backuppb.BackupManifest_DescriptorRevision, -) bool { +func (dri *DescriptorRevisionIterator) Next() { + if dri.err != nil { + return + } + wrapper := resultWrapper{} ok := dri.backing.next(&wrapper) if !ok { - return false + if err := dri.backing.err(); err != nil { + dri.err = err + } + + dri.value = nil + return } - err := unmarshalWrapper(&wrapper, revision) + nextRev, err := unmarshalWrapper(&wrapper) if err != nil { dri.err = err - return false + return } - return true + dri.value = &nextRev } -func unmarshalWrapper( - wrapper *resultWrapper, rev *backuppb.BackupManifest_DescriptorRevision, -) error { +func unmarshalWrapper(wrapper *resultWrapper) (backuppb.BackupManifest_DescriptorRevision, error) { var desc *descpb.Descriptor if len(wrapper.value) > 0 { desc = &descpb.Descriptor{} err := protoutil.Unmarshal(wrapper.value, desc) if err != nil { - return err + return backuppb.BackupManifest_DescriptorRevision{}, err } } id, err := decodeDescSSTKey(wrapper.key.Key) if err != nil { - return err + return backuppb.BackupManifest_DescriptorRevision{}, err } - *rev = backuppb.BackupManifest_DescriptorRevision{ + rev := backuppb.BackupManifest_DescriptorRevision{ Desc: desc, ID: id, Time: wrapper.key.Timestamp, } - return nil + return rev, nil } // StatsIterator is a simple iterator to iterate over stats.TableStatisticProtos. type StatsIterator struct { backing bytesIter + value *stats.TableStatisticProto err error } -// StatsIter creates a new StatsIterator for the backup metadata. -func (b *BackupMetadata) StatsIter(ctx context.Context) StatsIterator { +// NewStatsIter creates a new StatsIterator for the backup metadata. +func (b *BackupMetadata) NewStatsIter( + ctx context.Context, +) bulk.Iterator[*stats.TableStatisticProto] { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstStatsPrefix), b.enc, false, b.kmsEnv) - return StatsIterator{ + it := StatsIterator{ backing: backing, } + it.Next() + return &it } // Close closes the iterator. @@ -1299,37 +1408,44 @@ func (si *StatsIterator) Close() { si.backing.close() } -// Err returns the iterator's error. -func (si *StatsIterator) Err() error { +// Valid implements the Iterator interface. +func (si *StatsIterator) Valid() (bool, error) { if si.err != nil { - return si.err + return false, si.err } - return si.backing.err() + return si.value != nil, nil } -// Next retrieves the next stats proto in the iterator. -// -// Next returns true if next element was successfully unmarshalled into -// statsPtr, and false if there are no more elements or if an error was -// encountered. When Next returns false, the user should call the Err method to verify the -// existence of an error. -func (si *StatsIterator) Next(statsPtr **stats.TableStatisticProto) bool { +// Value implements the Iterator interface. +func (si *StatsIterator) Value() *stats.TableStatisticProto { + return si.value +} + +// Next implements the Iterator interface. +func (si *StatsIterator) Next() { + if si.err != nil { + return + } + wrapper := resultWrapper{} ok := si.backing.next(&wrapper) if !ok { - return false + if err := si.backing.err(); err != nil { + si.err = err + } + si.value = nil + return } var s stats.TableStatisticProto err := protoutil.Unmarshal(wrapper.value, &s) if err != nil { si.err = err - return false + return } - *statsPtr = &s - return true + si.value = &s } type bytesIter struct { @@ -1379,7 +1495,6 @@ func (bi *bytesIter) next(resWrapper *resultWrapper) bool { valid, err := bi.Iter.Valid() if err != nil || !valid || !bytes.HasPrefix(bi.Iter.UnsafeKey().Key, bi.prefix) { - bi.close() bi.iterError = err return false } @@ -1413,3 +1528,35 @@ type resultWrapper struct { key storage.MVCCKey value []byte } + +type sliceIterator[T any] struct { + backingSlice []T + idx int +} + +var _ bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] = &sliceIterator[backuppb.BackupManifest_DescriptorRevision]{} + +func newSlicePointerIterator[T any](backing []T) *sliceIterator[T] { + return &sliceIterator[T]{ + backingSlice: backing, + } +} + +func (s *sliceIterator[T]) Valid() (bool, error) { + return s.idx < len(s.backingSlice), nil +} + +func (s *sliceIterator[T]) Value() *T { + if s.idx < len(s.backingSlice) { + return &s.backingSlice[s.idx] + } + + return nil +} + +func (s *sliceIterator[T]) Next() { + s.idx++ +} + +func (s *sliceIterator[T]) Close() { +} diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling.go b/pkg/ccl/backupccl/backupinfo/manifest_handling.go index e43a178899a8..ca5e30497b3a 100644 --- a/pkg/ccl/backupccl/backupinfo/manifest_handling.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling.go @@ -15,7 +15,6 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "github.com/cockroachdb/cockroach/pkg/util" "path" "sort" "strconv" @@ -45,6 +44,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/stats" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -178,7 +179,7 @@ func ReadBackupManifestFromStore( backupbase.BackupManifestName, encryption, kmsEnv) if backupManifestErr != nil { if !errors.Is(backupManifestErr, cloud.ErrFileDoesNotExist) { - return backuppb.BackupManifest{}, 0, err + return backuppb.BackupManifest{}, 0, backupManifestErr } // If we did not find a `BACKUP_MANIFEST` we look for a `BACKUP` file as @@ -190,11 +191,11 @@ func ReadBackupManifestFromStore( backupbase.BackupOldManifestName, encryption, kmsEnv) if oldBackupManifestErr != nil { return backuppb.BackupManifest{}, 0, oldBackupManifestErr - } else { - // We found a `BACKUP` manifest file. - manifest = oldBackupManifest - memSize = oldBackupManifestMemSize } + + // We found a `BACKUP` manifest file. + manifest = oldBackupManifest + memSize = oldBackupManifestMemSize } else { // We found a `BACKUP_MANIFEST` file. manifest = backupManifest @@ -561,28 +562,35 @@ func WriteBackupLock( return cloud.WriteFile(ctx, defaultStore, lockFileName, bytes.NewReader([]byte("lock"))) } -// WriteFilesListMetadataWithSSTs writes a "slim" version of manifest -// to `exportStore`. This version has the alloc heavy `Files` repeated field -// nil'ed out, and written to an accompanying SST instead. -func WriteFilesListMetadataWithSSTs( +// WriteMetadataWithExternalSSTs writes a "slim" version of manifest to +// `exportStore`. This version has the alloc heavy `Files`, `Descriptors`, and +// `DescriptorChanges` repeated fields nil'ed out, and written to an +// accompanying SST instead. +func WriteMetadataWithExternalSSTs( ctx context.Context, exportStore cloud.ExternalStorage, encryption *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, manifest *backuppb.BackupManifest, ) error { - if err := writeFilesListMetadata(ctx, exportStore, backupbase.BackupMetadataName, encryption, - kmsEnv, manifest); err != nil { - return errors.Wrap(err, "failed to write the backup metadata with external Files list") + if err := WriteFilesListSST(ctx, exportStore, encryption, kmsEnv, manifest, + BackupMetadataFilesListPath); err != nil { + return errors.Wrap(err, "failed to write backup metadata Files SST") + } + + if err := WriteDescsSST(ctx, manifest, exportStore, encryption, kmsEnv, BackupMetadataDescriptorsListPath); err != nil { + return errors.Wrap(err, "failed to write backup metadata descriptors SST") } - return errors.Wrap(WriteFilesListSST(ctx, exportStore, encryption, kmsEnv, manifest, - BackupMetadataFilesListPath), "failed to write backup metadata Files SST") + + return errors.Wrap(writeExternalSSTsMetadata(ctx, exportStore, backupbase.BackupMetadataName, encryption, + kmsEnv, manifest), "failed to write the backup metadata with external Files list") } -// writeFilesListMetadata compresses and writes a slimmer version of the -// BackupManifest `desc` to `exportStore` with the `Files` field of the proto -// set to a bogus value that will error out on incorrect use. -func writeFilesListMetadata( +// writeExternalSSTsMetadata compresses and writes a slimmer version of the +// BackupManifest `desc` to `exportStore` with the `Files`, `Descriptors`, and +// `DescriptorChanges` fields of the proto set to bogus values that will error +// out on incorrect use. +func writeExternalSSTsMetadata( ctx context.Context, exportStore cloud.ExternalStorage, filename string, @@ -598,7 +606,31 @@ func writeFilesListMetadata( Path: "assertion: this placeholder legacy Files entry should never be opened", } slimManifest.Files = []backuppb.BackupManifest_File{bogusFile} - slimManifest.HasExternalFilesList = true + + // We write a bogus descriptor to Descriptors and DescriptorChanges with max + // timestamp as the modification time so RunPostDeserializationChanges() + // always fails on restore. + bogusTableID := descpb.ID(1) + bogusTableDesc := descpb.Descriptor{ + Union: &descpb.Descriptor_Table{ + Table: &descpb.TableDescriptor{ + ID: bogusTableID, + Name: "assertion: this placeholder legacy Descriptor entry should never be used", + Version: 1, + ModificationTime: hlc.MaxTimestamp, + }, + }, + } + slimManifest.Descriptors = []descpb.Descriptor{bogusTableDesc} + + bogusDescRev := backuppb.BackupManifest_DescriptorRevision{ + ID: bogusTableID, + Time: hlc.MaxTimestamp, + Desc: &bogusTableDesc, + } + slimManifest.DescriptorChanges = []backuppb.BackupManifest_DescriptorRevision{bogusDescRev} + + slimManifest.HasExternalManifestSSTs = true return WriteBackupManifest(ctx, exportStore, filename, encryption, kmsEnv, &slimManifest) } @@ -926,14 +958,23 @@ func GetBackupIndexAtTime( // LoadSQLDescsFromBackupsAtTime returns the Descriptors found in the last // (latest) backup with a StartTime >= asOf. +// +// TODO(rui): note that materializing all descriptors doesn't scale with cluster +// size. We temporarily materialize all descriptors here to limit the scope of +// changes required to use BackupManifest with iterating repeated fields in +// restore. func LoadSQLDescsFromBackupsAtTime( - backupManifests []backuppb.BackupManifest, asOf hlc.Timestamp, + ctx context.Context, + backupManifests []backuppb.BackupManifest, + layerToBackupManifestFileIterFactory LayerToBackupManifestFileIterFactory, + asOf hlc.Timestamp, ) ([]catalog.Descriptor, backuppb.BackupManifest, error) { lastBackupManifest := backupManifests[len(backupManifests)-1] + lastIterFactory := layerToBackupManifestFileIterFactory[len(backupManifests)-1] if asOf.IsEmpty() { if lastBackupManifest.DescriptorCoverage != tree.AllDescriptors { - descs, err := BackupManifestDescriptors(&lastBackupManifest) + descs, err := BackupManifestDescriptors(ctx, lastIterFactory, lastBackupManifest.EndTime) return descs, lastBackupManifest, err } @@ -950,21 +991,56 @@ func LoadSQLDescsFromBackupsAtTime( } lastBackupManifest = b } - if len(lastBackupManifest.DescriptorChanges) == 0 { - descs, err := BackupManifestDescriptors(&lastBackupManifest) - return descs, lastBackupManifest, err + + // From this point on we try to load descriptors based on descriptor + // revisions. The algorithm below assumes that descriptor revisions are sorted + // by DescChangesLess, which is a sort by descriptor ID, then descending by + // revision time for revisions with the same ID. The external SST for + // descriptors already have entries sorted in this order, we just have to make + // sure the in-memory descriptors in the manifest are ordered as well. + sort.Slice(lastBackupManifest.DescriptorChanges, func(i, j int) bool { + return DescChangesLess(&lastBackupManifest.DescriptorChanges[i], &lastBackupManifest.DescriptorChanges[j]) + }) + + descRevIt := lastIterFactory.NewDescriptorChangesIter(ctx) + defer descRevIt.Close() + if ok, err := descRevIt.Valid(); err != nil { + return nil, backuppb.BackupManifest{}, err + } else if !ok { + descs, err := BackupManifestDescriptors(ctx, lastIterFactory, lastBackupManifest.EndTime) + if err != nil { + return nil, backuppb.BackupManifest{}, err + } + return descs, lastBackupManifest, nil } - byID := make(map[descpb.ID]catalog.DescriptorBuilder, len(lastBackupManifest.Descriptors)) - for _, rev := range lastBackupManifest.DescriptorChanges { - if asOf.Less(rev.Time) { + byID := make(map[descpb.ID]catalog.DescriptorBuilder, 0) + prevRevID := descpb.InvalidID + for ; ; descRevIt.Next() { + if ok, err := descRevIt.Valid(); err != nil { + return nil, backuppb.BackupManifest{}, err + } else if !ok { break } - if rev.Desc == nil { - delete(byID, rev.ID) - } else { + + rev := descRevIt.Value() + if asOf.Less(rev.Time) { + continue + } + + // At this point descriptor revisions are sorted by DescChangesLess, which + // is a sort by descriptor ID, then descending by revision time for + // revisions with the same ID. If we've already seen a revision for this + // descriptor ID that's not greater than asOf, then we can skip the rest of + // the revisions for the ID. + if rev.ID == prevRevID { + continue + } + + if rev.Desc != nil { byID[rev.ID] = newDescriptorBuilder(rev.Desc, rev.Time) } + prevRevID = rev.ID } allDescs := make([]catalog.Descriptor, 0, len(byID)) @@ -1148,11 +1224,20 @@ func TempCheckpointFileNameForJob(jobID jobspb.JobID) string { // BackupManifestDescriptors returns the descriptors encoded in the manifest as // a slice of mutable descriptors. func BackupManifestDescriptors( - backupManifest *backuppb.BackupManifest, + ctx context.Context, iterFactory *IterFactory, endTime hlc.Timestamp, ) ([]catalog.Descriptor, error) { - ret := make([]catalog.Descriptor, 0, len(backupManifest.Descriptors)) - for i := range backupManifest.Descriptors { - b := newDescriptorBuilder(&backupManifest.Descriptors[i], backupManifest.EndTime) + descIt := iterFactory.NewDescIter(ctx) + defer descIt.Close() + + ret := make([]catalog.Descriptor, 0) + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + b := newDescriptorBuilder(descIt.Value(), endTime) if b == nil { continue } @@ -1459,3 +1544,125 @@ func MakeBackupCodec(manifest backuppb.BackupManifest) (keys.SQLCodec, error) { } return backupCodec, nil } + +// IterFactory has provides factory methods to construct iterators that iterate +// over the `BackupManifest_Files`, `Descriptors`, and +// `BackupManifest_DescriptorRevision` in a `BackupManifest`. It is the callers +// responsibility to close the returned iterators. +type IterFactory struct { + m *backuppb.BackupManifest + store cloud.ExternalStorage + fileSSTPath string + descriptorSSTPath string + encryption *jobspb.BackupEncryptionOptions + kmsEnv cloud.KMSEnv +} + +// NewIterFactory constructs a new IterFactory for a BackupManifest. +func NewIterFactory( + m *backuppb.BackupManifest, + store cloud.ExternalStorage, + encryption *jobspb.BackupEncryptionOptions, + kmsEnv cloud.KMSEnv, +) *IterFactory { + return &IterFactory{ + m: m, + store: store, + fileSSTPath: BackupMetadataFilesListPath, + descriptorSSTPath: BackupMetadataDescriptorsListPath, + encryption: encryption, + kmsEnv: kmsEnv, + } +} + +// LayerToBackupManifestFileIterFactory is the mapping from the idx of the +// backup layer to an IterFactory. +type LayerToBackupManifestFileIterFactory map[int]*IterFactory + +// NewFileIter creates a new Iterator over BackupManifest_Files. It is assumed +// that the BackupManifest_File are sorted by FileCmp. +func (f *IterFactory) NewFileIter( + ctx context.Context, +) (bulk.Iterator[*backuppb.BackupManifest_File], error) { + if f.m.HasExternalManifestSSTs { + storeFile := storageccl.StoreFile{ + Store: f.store, + FilePath: f.fileSSTPath, + } + var encOpts *roachpb.FileEncryptionOptions + if f.encryption != nil { + key, err := backupencryption.GetEncryptionKey(ctx, f.encryption, f.kmsEnv) + if err != nil { + return nil, err + } + encOpts = &roachpb.FileEncryptionOptions{Key: key} + } + return NewFileSSTIter(ctx, storeFile, encOpts) + } + + return newSlicePointerIterator(f.m.Files), nil +} + +// NewDescIter creates a new Iterator over Descriptors. +func (f *IterFactory) NewDescIter(ctx context.Context) bulk.Iterator[*descpb.Descriptor] { + if f.m.HasExternalManifestSSTs { + backing := makeBytesIter(ctx, f.store, f.descriptorSSTPath, []byte(sstDescsPrefix), f.encryption, true, f.kmsEnv) + it := DescIterator{ + backing: backing, + } + it.Next() + return &it + } + + return newSlicePointerIterator(f.m.Descriptors) +} + +// NewDescriptorChangesIter creates a new Iterator over +// BackupManifest_DescriptorRevisions. It is assumed that descriptor changes are +// sorted by DescChangesLess. +func (f *IterFactory) NewDescriptorChangesIter( + ctx context.Context, +) bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] { + if f.m.HasExternalManifestSSTs { + if f.m.MVCCFilter == backuppb.MVCCFilter_Latest { + // If the manifest is backuppb.MVCCFilter_Latest, then return an empty + // iterator for descriptor changes. + var backing []backuppb.BackupManifest_DescriptorRevision + return newSlicePointerIterator(backing) + } + + backing := makeBytesIter(ctx, f.store, f.descriptorSSTPath, []byte(sstDescsPrefix), f.encryption, + false, f.kmsEnv) + dri := DescriptorRevisionIterator{ + backing: backing, + } + + dri.Next() + return &dri + } + + return newSlicePointerIterator(f.m.DescriptorChanges) +} + +// GetBackupManifestIterFactories constructs a mapping from the idx of the +// backup layer to an IterFactory. +func GetBackupManifestIterFactories( + ctx context.Context, + storeFactory cloud.ExternalStorageFactory, + backupManifests []backuppb.BackupManifest, + encryption *jobspb.BackupEncryptionOptions, + kmsEnv cloud.KMSEnv, +) (map[int]*IterFactory, error) { + layerToFileIterFactory := make(map[int]*IterFactory) + for layer := range backupManifests { + es, err := storeFactory(ctx, backupManifests[layer].Dir) + if err != nil { + return nil, err + } + + f := NewIterFactory(&backupManifests[layer], es, encryption, kmsEnv) + layerToFileIterFactory[layer] = f + } + + return layerToFileIterFactory, nil +} diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go b/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go new file mode 100644 index 000000000000..b3246203ec5f --- /dev/null +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go @@ -0,0 +1,300 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupinfo_test + +import ( + "context" + "fmt" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/cloud" + _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/bulk" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestManifestHandlingIteratorOperations tests operations for iterators over +// the external SSTs of a backup manifest. +func TestManifestHandlingIteratorOperations(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numFiles = 10 + const numDescriptors = 10 + const changesPerDescriptor = 3 + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///foo", + base.ExternalIODirConfig{}, + tc.Server(0).ClusterSettings(), + blobs.TestEmptyBlobClientFactory, + username.RootUserName(), + tc.Server(0).InternalExecutor().(*sql.InternalExecutor), + tc.Server(0).InternalExecutorFactory().(*sql.InternalExecutorFactory), + tc.Server(0).DB(), + nil, /* limiters */ + cloud.NilMetrics, + ) + require.NoError(t, err) + + m := makeMockManifest(numFiles, numDescriptors, changesPerDescriptor) + require.NoError(t, backupinfo.WriteMetadataWithExternalSSTs(ctx, store, nil, nil, &m)) + + iterFactory := backupinfo.NewIterFactory(&m, store, nil, nil) + + fileLess := func(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) bool { + return backupinfo.FileCmp(left, right) < 0 + } + var sortedFiles []backuppb.BackupManifest_File + for i := range m.Files { + sortedFiles = append(sortedFiles, m.Files[i]) + } + sort.Slice(sortedFiles, func(i, j int) bool { + return fileLess(m.Files[i], m.Files[j]) + }) + + descLess := func(left descpb.Descriptor, right descpb.Descriptor) bool { + tLeft, _, _, _, _ := descpb.GetDescriptors(&left) + tRight, _, _, _, _ := descpb.GetDescriptors(&right) + return tLeft.ID < tRight.ID + } + var sortedDescs []descpb.Descriptor + for i := range m.Descriptors { + sortedDescs = append(sortedDescs, m.Descriptors[i]) + } + sort.Slice(sortedDescs, func(i, j int) bool { + return descLess(sortedDescs[i], sortedDescs[j]) + }) + + descRevsLess := func( + left backuppb.BackupManifest_DescriptorRevision, + right backuppb.BackupManifest_DescriptorRevision, + ) bool { + return backupinfo.DescChangesLess(&left, &right) + } + var sortedDescRevs []backuppb.BackupManifest_DescriptorRevision + for i := range m.DescriptorChanges { + sortedDescRevs = append(sortedDescRevs, m.DescriptorChanges[i]) + } + sort.Slice(sortedDescRevs, func(i, j int) bool { + return descRevsLess(sortedDescRevs[i], sortedDescRevs[j]) + }) + + t.Run("files", func(t *testing.T) { + checkIteratorOperations(t, mustCreateFileIterFactory(t, iterFactory), sortedFiles, fileLess) + }) + t.Run("descriptors", func(t *testing.T) { + checkIteratorOperations(t, iterFactory.NewDescIter, sortedDescs, descLess) + }) + t.Run("descriptor-changes", func(t *testing.T) { + checkIteratorOperations(t, iterFactory.NewDescriptorChangesIter, sortedDescRevs, descRevsLess) + }) +} + +// TestManifestHandlingIteratorOperations tests operations for an empty external +// manifest SST iterator. +func TestManifestHandlingEmptyIterators(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///foo", + base.ExternalIODirConfig{}, + tc.Server(0).ClusterSettings(), + blobs.TestEmptyBlobClientFactory, + username.RootUserName(), + tc.Server(0).InternalExecutor().(*sql.InternalExecutor), + tc.Server(0).InternalExecutorFactory().(*sql.InternalExecutorFactory), + tc.Server(0).DB(), + nil, /* limiters */ + cloud.NilMetrics, + ) + require.NoError(t, err) + + m := makeMockManifest(0, 0, 0) + require.NoError(t, backupinfo.WriteMetadataWithExternalSSTs(ctx, store, nil, nil, &m)) + + iterFactory := backupinfo.NewIterFactory(&m, store, nil, nil) + t.Run("files", func(t *testing.T) { + checkEmptyIteratorOperations(t, mustCreateFileIterFactory(t, iterFactory)) + }) + t.Run("descriptors", func(t *testing.T) { + checkEmptyIteratorOperations(t, iterFactory.NewDescIter) + }) + t.Run("descriptor-changes", func(t *testing.T) { + checkEmptyIteratorOperations(t, iterFactory.NewDescriptorChangesIter) + }) +} + +func makeMockManifest( + numFiles int, numDescriptors int, changesPerDescriptor int, +) backuppb.BackupManifest { + m := backuppb.BackupManifest{} + m.HasExternalManifestSSTs = true + m.MVCCFilter = backuppb.MVCCFilter_All + for i := 0; i < numFiles; i++ { + spKey := fmt.Sprintf("/Table/%04d", i) + spEndKey := fmt.Sprintf("/Table/%04d", i+1) + f := backuppb.BackupManifest_File{ + Span: roachpb.Span{ + Key: []byte(spKey), + EndKey: []byte(spEndKey), + }, + Path: fmt.Sprintf("file%04d.sst", i), + } + m.Files = append(m.Files, f) + } + + for i := 1; i <= numDescriptors; i++ { + // Have some deleted descriptors as well. + isDeleted := i%5 == 4 + + tbl := descpb.TableDescriptor{ID: descpb.ID(i), + Name: fmt.Sprintf("table%d", i), + Version: descpb.DescriptorVersion(changesPerDescriptor), + } + desc := descpb.Descriptor{Union: &descpb.Descriptor_Table{Table: &tbl}} + if !isDeleted { + m.Descriptors = append(m.Descriptors, desc) + } + + for j := 1; j <= changesPerDescriptor; j++ { + tbl.Version = descpb.DescriptorVersion(j) + rev := backuppb.BackupManifest_DescriptorRevision{ + Time: hlc.Timestamp{WallTime: int64(j)}, + ID: tbl.ID, + Desc: &desc, + } + + if isDeleted && j == changesPerDescriptor { + rev.Desc = nil + } + m.DescriptorChanges = append(m.DescriptorChanges, rev) + } + } + + return m +} + +func checkIteratorOperations[T any]( + t *testing.T, + mkIter func(context.Context) bulk.Iterator[*T], + expected []T, + less func(left T, right T) bool, +) { + ctx := context.Background() + + // 1. Check if the iterator returns the expected contents, regardless of how + // many times value is called between calls to Next(). + for numValueCalls := 1; numValueCalls <= 5; numValueCalls++ { + var actual []T + it := mkIter(ctx) + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + var value T + for i := 0; i < numValueCalls; i++ { + value = *it.Value() + } + + actual = append(actual, value) + } + + sort.Slice(actual, func(i, j int) bool { + return less(actual[i], actual[j]) + }) + + require.Equal(t, expected, actual, fmt.Sprintf("contents not equal if there are %d calls to Value()", numValueCalls)) + } + + // 2. Check that we can repeatedly call Next() and Value() after the iterator + // is done. + it := mkIter(ctx) + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + } + + for i := 0; i < 10; i++ { + it.Next() + ok, err := it.Valid() + require.False(t, ok) + require.NoError(t, err) + + it.Value() // Should not error or panic. + } + + // 3. Check that we can get the value without calling Valid(). + itNoCheck := mkIter(ctx) + defer itNoCheck.Close() + require.Greater(t, len(expected), 0) + value := itNoCheck.Value() + require.Contains(t, expected, *value) + + ok, err := itNoCheck.Valid() + require.True(t, ok) + require.NoError(t, err) +} + +func checkEmptyIteratorOperations[T any]( + t *testing.T, mkIter func(context.Context) bulk.Iterator[*T], +) { + ctx := context.Background() + + // Check that regardless of how many calls to Next() the iterator will not be + // valid. + for numNextCalls := 0; numNextCalls < 5; numNextCalls++ { + it := mkIter(ctx) + defer it.Close() + for i := 0; i < numNextCalls; i++ { + it.Next() + } + + ok, err := it.Valid() + require.NoError(t, err) + require.False(t, ok) + + it.Value() // Should not error or panic. + } +} + +func mustCreateFileIterFactory( + t *testing.T, iterFactory *backupinfo.IterFactory, +) func(ctx context.Context) bulk.Iterator[*backuppb.BackupManifest_File] { + return func(ctx context.Context) bulk.Iterator[*backuppb.BackupManifest_File] { + it, err := iterFactory.NewFileIter(ctx) + require.NoError(t, err) + return it + } +} diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index 0f2185294af8..e3fbb96bbf4c 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -131,12 +131,13 @@ message BackupManifest { int32 descriptor_coverage = 22 [ (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree.DescriptorCoverage"]; - // HasExternalFilesList is set to true if the backup manifest has its `Files` - // field nil'ed out and written as a supporting SST file instead. + // HasExternalManifestSSTs is set to true if the backup manifest has its + // `Files`, `Descriptors`, and DescriptorChanges fields nil'ed out and written + // as a supporting SST file instead. // // TODO(adityamaru): Delete when backwards compatibility with 22.2 is dropped // since all backups in 23.1+ will write slim manifests. - bool has_external_files_list = 27; + bool has_external_manifest_ssts = 27 [(gogoproto.customname) = "HasExternalManifestSSTs"]; // NEXT ID: 28 } diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index 69dd87441e5c..517fb8df5da4 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -13,6 +13,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -83,7 +84,7 @@ func BenchmarkRestoreEntryCover(b *testing.B) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(b, err) - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, execCfg.DistSQLSrv.ExternalStorage, + layerToBackupManifestFileIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(b, err) diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index ec5748053353..2107e965034d 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -613,7 +614,7 @@ func TestClusterRestoreFailCleanup(t *testing.T) { if err != nil { t.Fatal(err) } - if info.Name() == backupbase.BackupManifestName || !strings.HasSuffix(path, ".sst") { + if info.Name() == backupbase.BackupManifestName || !strings.HasSuffix(path, ".sst") || info.Name() == backupinfo.BackupMetadataDescriptorsListPath || info.Name() == backupinfo.BackupMetadataFilesListPath { return nil } return os.Remove(path) diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index 7a3c2e77e83f..3a51c8f3fd9c 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -10,11 +10,13 @@ package backupccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -227,7 +229,7 @@ func (gssp *generativeSplitAndScatterProcessor) close() { func makeBackupMetadata( ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.GenerativeSplitAndScatterSpec, -) ([]backuppb.BackupManifest, layerToBackupManifestFileIterFactory, error) { +) ([]backuppb.BackupManifest, backupinfo.LayerToBackupManifestFileIterFactory, error) { kmsEnv := backupencryption.MakeBackupKMSEnv(flowCtx.Cfg.Settings, &flowCtx.Cfg.ExternalIODirConfig, flowCtx.Cfg.DB, spec.User(), flowCtx.Cfg.Executor) @@ -238,7 +240,7 @@ func makeBackupMetadata( return nil, nil, err } - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, flowCtx.Cfg.ExternalStorage, + layerToBackupManifestFileIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, flowCtx.Cfg.ExternalStorage, backupManifests, spec.Encryption, &kmsEnv) if err != nil { return nil, nil, err diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go index a19d53bce1d6..0b63fe14c599 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -54,13 +53,12 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { // has been processed by the generative split and scatterer. s0 := tc.Server(0) registry := tc.Server(0).JobRegistry().(*jobs.Registry) - execCfg := s0.ExecutorConfig().(sql.ExecutorConfig) + execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) flowCtx := execinfra.FlowCtx{ Cfg: &execinfra.ServerConfig{ - Settings: st, - DB: s0.InternalDB().(descs.DB), - JobRegistry: registry, - ExecutorConfig: &execCfg, + Settings: st, + DB: s0.DB(), + JobRegistry: registry, TestingKnobs: execinfra.TestingKnobs{ BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ RunAfterSplitAndScatteringEntry: func(ctx context.Context) { @@ -68,9 +66,10 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { }, }, }, + ExternalStorageFromURI: execCfg.DistSQLSrv.ExternalStorageFromURI, + ExternalStorage: execCfg.DistSQLSrv.ExternalStorage, }, EvalCtx: &evalCtx, - Mon: evalCtx.TestingMon, DiskMonitor: testDiskMonitor, NodeID: evalCtx.NodeID, } @@ -108,8 +107,8 @@ func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { kr, err := MakeKeyRewriterFromRekeys(keys.SystemSQLCodec, tableRekeys, nil, false) require.NoError(t, err) - chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} - chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} + chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB, kr)} + chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB, kr)} // Large enough so doneScatterCh never blocks. doneScatterCh := make(chan entryNode, 1000) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 232f0a45f063..aa0d72acf4cf 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -281,10 +281,9 @@ func restore( // which are grouped by keyrange. highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(restoreCtx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, - backupManifests, encryption, kmsEnv) + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(restoreCtx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, backupManifests, encryption, kmsEnv) if err != nil { - return emptyRowCount, err + return roachpb.RowCount{}, err } simpleImportSpans := useSimpleImportSpans.Get(&execCtx.ExecCfg().Settings.SV) @@ -318,7 +317,7 @@ func restore( ctx, dataToRestore.getSpans(), backupManifests, - layerToBackupManifestFileIterFactory, + layerToIterFactory, backupLocalityMap, introducedSpanFrontier, highWaterMark, @@ -493,7 +492,13 @@ func loadBackupSQLDescs( return nil, backuppb.BackupManifest{}, nil, 0, err } - allDescs, latestBackupManifest, err := backupinfo.LoadSQLDescsFromBackupsAtTime(backupManifests, details.EndTime) + layerToBackupManifestFileIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, + backupManifests, encryption, kmsEnv) + if err != nil { + return nil, backuppb.BackupManifest{}, nil, 0, err + } + + allDescs, latestBackupManifest, err := backupinfo.LoadSQLDescsFromBackupsAtTime(ctx, backupManifests, layerToBackupManifestFileIterFactory, details.EndTime) if err != nil { return nil, backuppb.BackupManifest{}, nil, 0, err } diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index d66d96eaa828..87030a067407 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -799,15 +799,30 @@ func maybeUpgradeDescriptors(descs []catalog.Descriptor, skipFKsWithNoMatchingTa // "other" table is missing from the set provided are omitted during the // upgrade, instead of causing an error to be returned. func maybeUpgradeDescriptorsInBackupManifests( - backupManifests []backuppb.BackupManifest, skipFKsWithNoMatchingTable bool, + ctx context.Context, + backupManifests []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, + skipFKsWithNoMatchingTable bool, ) error { if len(backupManifests) == 0 { return nil } + // TODO(rui): We do not need to upgrade descriptors that exist in the external + // SST to the 19.2 style because they would've been generated by at least + // version 22.2. Delete this function once backups must use manifests with + // external SSTs. + newDescriptorStyleVersion := roachpb.Version{ + Major: 19, + Minor: 2, + } + if !backupManifests[0].ClusterVersion.Less(newDescriptorStyleVersion) { + return nil + } + descriptors := make([]catalog.Descriptor, 0, len(backupManifests[0].Descriptors)) for i := range backupManifests { - descs, err := backupinfo.BackupManifestDescriptors(&backupManifests[i]) + descs, err := backupinfo.BackupManifestDescriptors(ctx, layerToIterFactory[i], backupManifests[i].EndTime) if err != nil { return err } @@ -1570,10 +1585,24 @@ func doRestorePlan( // be caught by backups. wasOffline := make(map[tableAndIndex]hlc.Timestamp) - for _, m := range mainBackupManifests { + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, mainBackupManifests, encryption, &kmsEnv) + if err != nil { + return err + } + + for i, m := range mainBackupManifests { spans := roachpb.Spans(m.Spans) - for i := range m.Descriptors { - table, _, _, _, _ := descpb.GetDescriptors(&m.Descriptors[i]) + descIt := layerToIterFactory[i].NewDescIter(ctx) + defer descIt.Close() + + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return err + } else if !ok { + break + } + + table, _, _, _, _ := descpb.GetDescriptors(descIt.Value()) if table == nil { continue } @@ -1598,7 +1627,7 @@ func doRestorePlan( } sqlDescs, restoreDBs, descsByTablePattern, tenants, err := selectTargets( - ctx, p, mainBackupManifests, restoreStmt.Targets, restoreStmt.DescriptorCoverage, endTime, + ctx, p, mainBackupManifests, layerToIterFactory, restoreStmt.Targets, restoreStmt.DescriptorCoverage, endTime, ) if err != nil { return errors.Wrap(err, @@ -1606,7 +1635,7 @@ func doRestorePlan( "use SHOW BACKUP to find correct targets") } - if err := checkMissingIntroducedSpans(sqlDescs, mainBackupManifests, endTime, backupCodec); err != nil { + if err := checkMissingIntroducedSpans(ctx, sqlDescs, mainBackupManifests, layerToIterFactory, endTime, backupCodec); err != nil { return err } diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index cdd4057ba9b5..181b474cf160 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -13,19 +13,15 @@ import ( "context" "sort" - "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" - "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" - "github.com/cockroachdb/cockroach/pkg/cloud" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" - "github.com/cockroachdb/errors" ) type intervalSpan roachpb.Span @@ -67,79 +63,6 @@ type backupManifestFileIterator interface { peek() (backuppb.BackupManifest_File, bool) err() error close() - reset() -} - -// inMemoryFileIterator iterates over the `BackupManifest_Files` field stored -// in-memory in the manifest. -type inMemoryFileIterator struct { - manifest *backuppb.BackupManifest - curIdx int -} - -func (i *inMemoryFileIterator) next() (backuppb.BackupManifest_File, bool) { - f, hasNext := i.peek() - i.curIdx++ - return f, hasNext -} - -func (i *inMemoryFileIterator) peek() (backuppb.BackupManifest_File, bool) { - if i.curIdx >= len(i.manifest.Files) { - return backuppb.BackupManifest_File{}, false - } - f := i.manifest.Files[i.curIdx] - return f, true -} - -func (i *inMemoryFileIterator) err() error { - return nil -} - -func (i *inMemoryFileIterator) close() {} - -func (i *inMemoryFileIterator) reset() { - i.curIdx = 0 -} - -var _ backupManifestFileIterator = &inMemoryFileIterator{} - -// makeBackupManifestFileIterator returns a backupManifestFileIterator that can -// be used to iterate over the `BackupManifest_Files` of the manifest. -func makeBackupManifestFileIterator( - ctx context.Context, - storeFactory cloud.ExternalStorageFactory, - m backuppb.BackupManifest, - encryption *jobspb.BackupEncryptionOptions, - kmsEnv cloud.KMSEnv, -) (backupManifestFileIterator, error) { - if m.HasExternalFilesList { - es, err := storeFactory(ctx, m.Dir) - if err != nil { - return nil, err - } - storeFile := storageccl.StoreFile{ - Store: es, - FilePath: backupinfo.BackupMetadataFilesListPath, - } - var encOpts *roachpb.FileEncryptionOptions - if encryption != nil { - key, err := backupencryption.GetEncryptionKey(ctx, encryption, kmsEnv) - if err != nil { - return nil, err - } - encOpts = &roachpb.FileEncryptionOptions{Key: key} - } - it, err := backupinfo.NewFileSSTIter(ctx, storeFile, encOpts) - if err != nil { - return nil, errors.Wrap(err, "failed to create new FileSST iterator") - } - return &sstFileIterator{fi: it}, nil - } - - return &inMemoryFileIterator{ - manifest: &m, - curIdx: 0, - }, nil } // sstFileIterator uses an underlying `backupinfo.FileIterator` to read the @@ -173,10 +96,6 @@ func (s *sstFileIterator) close() { s.fi.Close() } -func (s *sstFileIterator) reset() { - s.fi.Reset() -} - var _ backupManifestFileIterator = &sstFileIterator{} // makeSimpleImportSpans partitions the spans of requiredSpans into a covering @@ -209,9 +128,10 @@ var _ backupManifestFileIterator = &sstFileIterator{} // if its current data size plus that of the new span is less than the target // size. func makeSimpleImportSpans( + ctx context.Context, requiredSpans roachpb.Spans, backups []backuppb.BackupManifest, - layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, backupLocalityMap map[int]storeByLocalityKV, introducedSpanFrontier *spanUtils.Frontier, lowWaterMark roachpb.Key, @@ -266,18 +186,24 @@ func makeSimpleImportSpans( // we reach out to ExternalStorage to read the accompanying SST that // contains the BackupManifest_Files. iterFactory := layerToBackupManifestFileIterFactory[layer] - it, err := iterFactory() + it, err := iterFactory.NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() + defer it.Close() covPos := spanCoverStart // lastCovSpanSize is the size of files added to the right-most span of // the cover so far. var lastCovSpanSize int64 - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + f := it.Value() if sp := span.Intersect(f.Span); sp.Valid() { fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir} if dir, ok := backupLocalityMap[layer][f.LocalityKV]; ok { @@ -342,9 +268,6 @@ func makeSimpleImportSpans( break } } - if err := it.err(); err != nil { - return nil, err - } } } @@ -383,32 +306,6 @@ func makeEntry(start, end roachpb.Key, f execinfrapb.RestoreFileSpec) execinfrap } } -type layerToBackupManifestFileIterFactory map[int]func() (backupManifestFileIterator, error) - -// getBackupManifestFileIters constructs a mapping from the idx of the backup -// layer to a factory method to construct a backupManifestFileIterator. This -// iterator can be used to iterate over the `BackupManifest_Files` in a -// `BackupManifest`. It is the callers responsibility to close the returned -// iterators. -func getBackupManifestFileIters( - ctx context.Context, - storeFactory cloud.ExternalStorageFactory, - backupManifests []backuppb.BackupManifest, - encryption *jobspb.BackupEncryptionOptions, - kmsEnv cloud.KMSEnv, -) (map[int]func() (backupManifestFileIterator, error), error) { - layerToFileIterFactory := make(map[int]func() (backupManifestFileIterator, error)) - for layer := range backupManifests { - layer := layer - layerToFileIterFactory[layer] = func() (backupManifestFileIterator, error) { - manifest := backupManifests[layer] - return makeBackupManifestFileIterator(ctx, storeFactory, manifest, encryption, kmsEnv) - } - } - - return layerToFileIterFactory, nil -} - // generateAndSendImportSpans partitions the spans of requiredSpans into a // covering of RestoreSpanEntry's which each have all overlapping files from the // passed backups assigned to them. The spans of requiredSpans are @@ -454,7 +351,7 @@ func generateAndSendImportSpans( ctx context.Context, requiredSpans roachpb.Spans, backups []backuppb.BackupManifest, - layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, backupLocalityMap map[int]storeByLocalityKV, introducedSpanFrontier *spanUtils.Frontier, lowWaterMark roachpb.Key, @@ -463,7 +360,7 @@ func generateAndSendImportSpans( useSimpleImportSpans bool, ) error { if useSimpleImportSpans { - importSpans, err := makeSimpleImportSpans(requiredSpans, backups, layerToBackupManifestFileIterFactory, backupLocalityMap, introducedSpanFrontier, lowWaterMark, targetSize) + importSpans, err := makeSimpleImportSpans(ctx, requiredSpans, backups, layerToBackupManifestFileIterFactory, backupLocalityMap, introducedSpanFrontier, lowWaterMark, targetSize) if err != nil { return err } @@ -474,14 +371,14 @@ func generateAndSendImportSpans( return nil } - startEndKeyIt, err := newFileSpanStartAndEndKeyIterator(backups, layerToBackupManifestFileIterFactory) + startEndKeyIt, err := newFileSpanStartAndEndKeyIterator(ctx, backups, layerToBackupManifestFileIterFactory) if err != nil { return err } - fileIterByLayer := make([]backupManifestFileIterator, 0, len(backups)) + fileIterByLayer := make([]bulk.Iterator[*backuppb.BackupManifest_File], 0, len(backups)) for layer := range backups { - iter, err := layerToBackupManifestFileIterFactory[layer]() + iter, err := layerToBackupManifestFileIterFactory[layer].NewFileIter(ctx) if err != nil { return err } @@ -493,7 +390,7 @@ func generateAndSendImportSpans( // the cover so far. var lastCovSpanSize int64 var lastCovSpan roachpb.Span - var covFilesByLayer [][]backuppb.BackupManifest_File + var covFilesByLayer [][]*backuppb.BackupManifest_File var firstInSpan bool flush := func(ctx context.Context) error { @@ -591,7 +488,7 @@ func generateAndSendImportSpans( return err } - var filesByLayer [][]backuppb.BackupManifest_File + var filesByLayer [][]*backuppb.BackupManifest_File var covSize int64 var newCovFilesSize int64 @@ -694,16 +591,18 @@ func generateAndSendImportSpans( // [a, b, c, e, f, g] type fileSpanStartAndEndKeyIterator struct { heap *fileHeap - allIters []backupManifestFileIterator + allIters []bulk.Iterator[*backuppb.BackupManifest_File] err error } func newFileSpanStartAndEndKeyIterator( - backups []backuppb.BackupManifest, layerToIterFactory layerToBackupManifestFileIterFactory, + ctx context.Context, + backups []backuppb.BackupManifest, + layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, ) (*fileSpanStartAndEndKeyIterator, error) { it := &fileSpanStartAndEndKeyIterator{} for layer := range backups { - iter, err := layerToIterFactory[layer]() + iter, err := layerToBackupManifestFileIterFactory[layer].NewFileIter(ctx) if err != nil { return nil, err } @@ -730,14 +629,13 @@ func (i *fileSpanStartAndEndKeyIterator) next() { } if minItem.cmpEndKey { - file, ok := minItem.fileIter.next() - if err := minItem.fileIter.err(); err != nil { + minItem.fileIter.Next() + if ok, err := minItem.fileIter.Valid(); err != nil { i.err = err return - } - if ok { + } else if ok { minItem.cmpEndKey = false - minItem.file = file + minItem.file = minItem.fileIter.Value() heap.Push(i.heap, minItem) } } else { @@ -766,27 +664,25 @@ func (i *fileSpanStartAndEndKeyIterator) reset() { i.err = nil for _, iter := range i.allIters { - iter.reset() - - file, ok := iter.next() - if err := iter.err(); err != nil { + if ok, err := iter.Valid(); err != nil { i.err = err return + } else if !ok { + continue } - if ok { - i.heap.fileHeapItems = append(i.heap.fileHeapItems, fileHeapItem{ - fileIter: iter, - file: file, - cmpEndKey: false, - }) - } + + i.heap.fileHeapItems = append(i.heap.fileHeapItems, fileHeapItem{ + fileIter: iter, + file: iter.Value(), + cmpEndKey: false, + }) } heap.Init(i.heap) } type fileHeapItem struct { - fileIter backupManifestFileIterator - file backuppb.BackupManifest_File + fileIter bulk.Iterator[*backuppb.BackupManifest_File] + file *backuppb.BackupManifest_File cmpEndKey bool } @@ -831,19 +727,23 @@ func (f *fileHeap) Pop() any { } func getNewIntersectingFilesByLayer( - span roachpb.Span, layersCoveredLater map[int]bool, fileIters []backupManifestFileIterator, -) ([][]backuppb.BackupManifest_File, error) { - var files [][]backuppb.BackupManifest_File + span roachpb.Span, + layersCoveredLater map[int]bool, + fileIters []bulk.Iterator[*backuppb.BackupManifest_File], +) ([][]*backuppb.BackupManifest_File, error) { + var files [][]*backuppb.BackupManifest_File for l, iter := range fileIters { - var layerFiles []backuppb.BackupManifest_File + var layerFiles []*backuppb.BackupManifest_File if !layersCoveredLater[l] { - for ; ; iter.next() { - f, ok := iter.peek() - if !ok { + for ; ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return nil, err + } else if !ok { break } + f := iter.Value() if span.Overlaps(f.Span) { layerFiles = append(layerFiles, f) } @@ -852,9 +752,6 @@ func getNewIntersectingFilesByLayer( break } } - if iter.err() != nil { - return nil, iter.err() - } } files = append(files, layerFiles) } diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index a6e88764e562..fc4b550e38e0 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -62,7 +64,7 @@ func MockBackupChain( } for i := range backups { - backups[i].HasExternalFilesList = hasExternalFilesList + backups[i].HasExternalManifestSSTs = hasExternalFilesList backups[i].Spans = make(roachpb.Spans, spans) backups[i].IntroducedSpans = make(roachpb.Spans, 0) for j := range backups[i].Spans { @@ -108,15 +110,14 @@ func MockBackupChain( backups[i].Files[f].EntryCounts.DataSize = 1 << 20 } - config := cloudpb.ExternalStorage{S3Config: &cloudpb.ExternalStorage_S3{}} - if backups[i].HasExternalFilesList { + es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, + fmt.Sprintf("nodelocal://1/mock%s", timeutil.Now().String()), username.RootUserName()) + if err != nil { + return nil, err + } + config := es.Conf() + if backups[i].HasExternalManifestSSTs { // Write the Files to an SST and put them at a well known location. - es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, - fmt.Sprintf("nodelocal://1/mock%s", timeutil.Now().String()), username.RootUserName()) - if err != nil { - return nil, err - } - config = es.Conf() manifestCopy := backups[i] err = backupinfo.WriteFilesListSST(ctx, es, nil, nil, &manifestCopy, backupinfo.BackupMetadataFilesListPath) @@ -124,6 +125,13 @@ func MockBackupChain( return nil, err } backups[i].Files = nil + + err = backupinfo.WriteDescsSST(ctx, &manifestCopy, es, nil, nil, backupinfo.BackupMetadataDescriptorsListPath) + if err != nil { + return nil, err + } + backups[i].Descriptors = nil + backups[i].DescriptorChanges = nil } // A non-nil Dir more accurately models the footprint of produced coverings. backups[i].Dir = config @@ -159,9 +167,14 @@ func checkRestoreCovering( return err } + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, storageFactory, backups, nil, nil) + if err != nil { + return err + } + for _, span := range spans { var last roachpb.Key - for _, b := range backups { + for i, b := range backups { var coveredLater bool introducedSpanFrontier.Entries(func(s roachpb.Span, ts hlc.Timestamp) (done spanUtils.OpResult) { @@ -178,13 +191,18 @@ func checkRestoreCovering( // for explanation. continue } - it, err := makeBackupManifestFileIterator(ctx, storageFactory, b, - nil, nil) + it, err := layerToIterFactory[i].NewFileIter(ctx) if err != nil { return err } - defer it.close() - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return err + } else if !ok { + break + } + f := it.Value() if sp := span.Intersect(f.Span); sp.Valid() { if required[f.Path] == nil { required[f.Path] = &roachpb.SpanGroup{} @@ -195,9 +213,6 @@ func checkRestoreCovering( } } } - if it.err() != nil { - return it.err() - } } } var spanIdx int @@ -230,7 +245,7 @@ func makeImportSpans( ctx context.Context, spans []roachpb.Span, backups []backuppb.BackupManifest, - layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, targetSize int64, introducedSpanFrontier *spanUtils.Frontier, useSimpleImportSpans bool, @@ -245,7 +260,7 @@ func makeImportSpans( return nil }) - err := generateAndSendImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, targetSize, spanCh, useSimpleImportSpans) + err := generateAndSendImportSpans(ctx, spans, backups, layerToIterFactory, nil, introducedSpanFrontier, nil, targetSize, spanCh, useSimpleImportSpans) close(spanCh) if err != nil { @@ -257,6 +272,54 @@ func makeImportSpans( return cover, nil } +type coverutils struct { + dir cloudpb.ExternalStorage +} + +func makeCoverUtils(ctx context.Context, t *testing.T, execCfg *sql.ExecutorConfig) coverutils { + es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, + fmt.Sprintf("nodelocal://1/mock%s", timeutil.Now().String()), username.RootUserName()) + require.NoError(t, err) + dir := es.Conf() + return coverutils{ + dir: dir, + } +} + +func (c coverutils) sp(start, end string) roachpb.Span { + return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)} +} + +func (c coverutils) makeManifests(manifests []roachpb.Spans) []backuppb.BackupManifest { + ms := make([]backuppb.BackupManifest, len(manifests)) + fileCount := 1 + for i, manifest := range manifests { + ms[i].StartTime = hlc.Timestamp{WallTime: int64(i)} + ms[i].EndTime = hlc.Timestamp{WallTime: int64(i + 1)} + ms[i].Files = make([]backuppb.BackupManifest_File, len(manifest)) + ms[i].Dir = c.dir + for j, sp := range manifest { + ms[i].Files[j] = backuppb.BackupManifest_File{ + Span: sp, + Path: fmt.Sprintf("%d", fileCount), + + // Pretend every span has 1MB. + EntryCounts: roachpb.RowCount{DataSize: 1 << 20}, + } + fileCount++ + } + } + return ms +} + +func (c coverutils) paths(names ...string) []execinfrapb.RestoreFileSpec { + r := make([]execinfrapb.RestoreFileSpec, len(names)) + for i := range names { + r[i].Path = names[i] + r[i].Dir = c.dir + } + return r +} func TestRestoreEntryCoverExample(t *testing.T) { defer leaktest.AfterTest(t)() @@ -267,28 +330,16 @@ func TestRestoreEntryCoverExample(t *testing.T) { InitManualReplication) defer cleanupFn() - sp := func(start, end string) roachpb.Span { - return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)} - } - f := func(start, end, path string) backuppb.BackupManifest_File { - return backuppb.BackupManifest_File{Span: sp(start, end), Path: path} - } - paths := func(names ...string) []execinfrapb.RestoreFileSpec { - r := make([]execinfrapb.RestoreFileSpec, len(names)) - for i := range names { - r[i].Path = names[i] - } - return r - } + execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) + c := makeCoverUtils(ctx, t, &execCfg) // Setup and test the example in the comment of makeSimpleImportSpans. - spans := []roachpb.Span{sp("a", "f"), sp("f", "i"), sp("l", "m")} - backups := []backuppb.BackupManifest{ - {Files: []backuppb.BackupManifest_File{f("a", "c", "1"), f("c", "e", "2"), f("h", "i", "3")}}, - {Files: []backuppb.BackupManifest_File{f("b", "d", "4"), f("g", "i", "5")}}, - {Files: []backuppb.BackupManifest_File{f("a", "h", "6"), f("j", "k", "7")}}, - {Files: []backuppb.BackupManifest_File{f("h", "i", "8"), f("l", "m", "9")}}, - } + spans := []roachpb.Span{c.sp("a", "f"), c.sp("f", "i"), c.sp("l", "m")} + backups := c.makeManifests([]roachpb.Spans{ + {c.sp("a", "c"), c.sp("c", "e"), c.sp("h", "i")}, + {c.sp("b", "d"), c.sp("g", "i")}, + {c.sp("a", "h"), c.sp("j", "k")}, + {c.sp("h", "i"), c.sp("l", "m")}}) for i := range backups { backups[i].StartTime = hlc.Timestamp{WallTime: int64(i)} @@ -303,48 +354,157 @@ func TestRestoreEntryCoverExample(t *testing.T) { emptySpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{}) require.NoError(t, err) - execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, execCfg.DistSQLSrv.ExternalStorage, + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(t, err) - cover, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, noSpanTargetSize, emptySpanFrontier, false) + cover, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, noSpanTargetSize, emptySpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "b"), Files: paths("1", "6")}, - {Span: sp("b", "c"), Files: paths("1", "4", "6")}, - {Span: sp("c", "f"), Files: paths("2", "4", "6")}, - {Span: sp("f", "g"), Files: paths("6")}, - {Span: sp("g", "h"), Files: paths("5", "6")}, - {Span: sp("h", "i"), Files: paths("3", "5", "8")}, - {Span: sp("l", "m"), Files: paths("9")}, + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, }, cover) - coverSized, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, 2<<20, emptySpanFrontier, false) + coverSized, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, 2<<20, emptySpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "b"), Files: paths("1", "6")}, - {Span: sp("b", "c"), Files: paths("1", "4", "6")}, - {Span: sp("c", "f"), Files: paths("2", "4", "6")}, - {Span: sp("f", "h"), Files: paths("5", "6")}, - {Span: sp("h", "i"), Files: paths("3", "5", "8")}, - {Span: sp("l", "m"), Files: paths("9")}, + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, }, coverSized) // check that introduced spans are properly elided - backups[2].IntroducedSpans = []roachpb.Span{sp("a", "f")} + backups[2].IntroducedSpans = []roachpb.Span{c.sp("a", "f")} introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - coverIntroduced, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, noSpanTargetSize, introducedSpanFrontier, false) + coverIntroduced, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, noSpanTargetSize, introducedSpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "f"), Files: paths("6")}, - {Span: sp("f", "g"), Files: paths("6")}, - {Span: sp("g", "h"), Files: paths("5", "6")}, - {Span: sp("h", "i"), Files: paths("3", "5", "8")}, - {Span: sp("l", "m"), Files: paths("9")}, + {Span: c.sp("a", "f"), Files: c.paths("6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, }, coverIntroduced) +} + +func TestFileSpanStartKeyIterator(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + c := makeCoverUtils(ctx, t, &execCfg) + type testSpec struct { + manifestFiles []roachpb.Spans + keysSurfaced []string + expectedError string + } + + for _, sp := range []testSpec{ + { + // adjacent and disjoint files. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "b"), c.sp("c", "d"), c.sp("d", "e")}, + }, + keysSurfaced: []string{"a", "b", "c", "d", "e"}, + }, + { + // shadow start key (b) if another span covers it. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "c"), c.sp("b", "d")}, + }, + keysSurfaced: []string{"a", "c", "d"}, + }, + { + // swap the file order and expect an error. + manifestFiles: []roachpb.Spans{ + {c.sp("b", "d"), c.sp("a", "c")}, + }, + keysSurfaced: []string{"b", "d", "a", "c"}, + expectedError: "out of order backup keys", + }, + { + // overlapping files within a level. + manifestFiles: []roachpb.Spans{ + {c.sp("b", "f"), c.sp("c", "d"), c.sp("e", "g")}, + }, + keysSurfaced: []string{"b", "f", "g"}, + }, + { + // overlapping files within and across levels. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "e"), c.sp("d", "f")}, + {c.sp("b", "c")}, + }, + keysSurfaced: []string{"a", "b", "c", "e", "f"}, + }, + { + // overlapping start key in one level, but non overlapping in another level. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "c"), c.sp("b", "d")}, + {c.sp("b", "c")}, + }, + keysSurfaced: []string{"a", "b", "c", "d"}, + }, + { + // overlapping files in both levels. + manifestFiles: []roachpb.Spans{ + {c.sp("b", "e"), c.sp("d", "i")}, + {c.sp("a", "c"), c.sp("b", "h")}, + }, + keysSurfaced: []string{"a", "b", "c", "e", "h", "i"}, + }, + { + // ensure everything works with 3 layers. + manifestFiles: []roachpb.Spans{ + {c.sp("a", "e"), c.sp("e", "f")}, + {c.sp("b", "e"), c.sp("e", "f")}, + {c.sp("c", "e"), c.sp("d", "f")}, + }, + keysSurfaced: []string{"a", "b", "c", "e", "f"}, + }, + } { + backups := c.makeManifests(sp.manifestFiles) + + // randomly shuffle the order of the manifests, as order should not matter. + for i := range backups { + j := rand.Intn(i + 1) + backups[i], backups[j] = backups[j], backups[i] + } + + // ensure all the expected keys are surfaced. + layerToBackupManifestFileIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, + backups, nil, nil) + require.NoError(t, err) + + sanityCheckFileIterator(ctx, t, layerToBackupManifestFileIterFactory[0], backups[0]) + + startEndKeyIt, err := newFileSpanStartAndEndKeyIterator(ctx, backups, layerToBackupManifestFileIterFactory) + require.NoError(t, err) + + for _, expectedKey := range sp.keysSurfaced { + if ok, err := startEndKeyIt.valid(); !ok { + if err != nil { + require.Error(t, err, sp.expectedError) + } + break + } + expected := roachpb.Key(expectedKey) + require.Equal(t, expected, startEndKeyIt.value()) + startEndKeyIt.next() + } + } } type mockBackupInfo struct { @@ -399,8 +559,13 @@ func createMockManifest( files = append(files, backuppb.BackupManifest_File{Span: sp, Path: path}) } + ctx := context.Background() + es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, + fmt.Sprintf("nodelocal://1/mock%s", timeutil.Now().String()), username.RootUserName()) + require.NoError(t, err) + return backuppb.BackupManifest{Spans: spans, - EndTime: endTime, Files: files} + EndTime: endTime, Files: files, Dir: es.Conf()} } // TestRestoreEntryCoverReIntroducedSpans checks that all reintroduced spans are @@ -545,10 +710,10 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(t, err) - cover, err := makeImportSpans(ctx, restoreSpans, backups, layerToBackupManifestFileIterFactory, + cover, err := makeImportSpans(ctx, restoreSpans, backups, layerToIterFactory, 0, introducedSpanFrontier, false) require.NoError(t, err) @@ -573,6 +738,31 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { } } +// sanityCheckFileIterator ensures the backup files are surfaced in the order they are stored in +// the manifest. +func sanityCheckFileIterator( + ctx context.Context, + t *testing.T, + iterFactory *backupinfo.IterFactory, + backup backuppb.BackupManifest, +) { + iter, err := iterFactory.NewFileIter(ctx) + require.NoError(t, err) + defer iter.Close() + + for _, expectedFile := range backup.Files { + if ok, err := iter.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + t.Fatalf("file iterator should have file with path %s", expectedFile.Path) + } + + file := iter.Value() + require.Equal(t, expectedFile, *file) + iter.Next() + } +} + func TestRestoreEntryCover(t *testing.T) { defer leaktest.AfterTest(t)() r, _ := randutil.NewTestRand() @@ -588,18 +778,19 @@ func TestRestoreEntryCover(t *testing.T) { for _, simpleImportSpans := range []bool{true, false} { backups, err := MockBackupChain(ctx, numBackups, spans, files, r, hasExternalFilesList, execCfg) require.NoError(t, err) - + layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, + execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) + require.NoError(t, err) + randLayer := rand.Intn(len(backups)) + randBackup := backups[randLayer] + sanityCheckFileIterator(ctx, t, layerToIterFactory[randLayer], randBackup) for _, target := range []int64{0, 1, 4, 100, 1000} { t.Run(fmt.Sprintf("numBackups=%d, numSpans=%d, numFiles=%d, merge=%d, slim=%t, simple=%t", numBackups, spans, files, target, hasExternalFilesList, simpleImportSpans), func(t *testing.T) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, - execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) - require.NoError(t, err) cover, err := makeImportSpans(ctx, backups[numBackups-1].Spans, backups, - layerToBackupManifestFileIterFactory, target<<20, introducedSpanFrontier, simpleImportSpans) + layerToIterFactory, target<<20, introducedSpanFrontier, simpleImportSpans) require.NoError(t, err) require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index 9a67dd0cab77..1ff260f8b067 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -104,7 +104,7 @@ func (m manifestInfoReader) showBackup( // FKs for which we can't resolve the cross-table references. We can't // display them anyway, because we don't have the referenced table names, // etc. - err := maybeUpgradeDescriptorsInBackupManifests(info.manifests, + err := maybeUpgradeDescriptorsInBackupManifests(ctx, info.manifests, info.layerToIterFactory, true /* skipFKsWithNoMatchingTable */) if err != nil { return err @@ -444,6 +444,12 @@ you must pass the 'encryption_info_dir' parameter that points to the directory o return err } } + + info.layerToIterFactory, err = backupinfo.GetBackupManifestIterFactories(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, info.manifests, info.enc, info.kmsEnv) + if err != nil { + return err + } + // If backup is locality aware, check that user passed at least some localities. // TODO (msbutler): this is an extremely crude check that the user is @@ -572,13 +578,19 @@ func checkBackupFiles( // Check all backup SSTs. fileSizes := make([]int64, 0) - it, err := makeBackupManifestFileIterator(ctx, execCfg.DistSQLSrv.ExternalStorage, - info.manifests[layer], encryption, kmsEnv) + it, err := info.layerToIterFactory[layer].NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + f := it.Value() store := defaultStore uri := info.defaultURIs[layer] if _, ok := localityStores[f.LocalityKV]; ok { @@ -599,9 +611,6 @@ func checkBackupFiles( } fileSizes = append(fileSizes, sz) } - if it.err() != nil { - return nil, it.err() - } return fileSizes, nil } @@ -636,11 +645,14 @@ type backupInfo struct { collectionURI string defaultURIs []string manifests []backuppb.BackupManifest - subdir string - localityInfo []jobspb.RestoreDetails_BackupLocalityInfo - enc *jobspb.BackupEncryptionOptions - kmsEnv cloud.KMSEnv - fileSizes [][]int64 + // layerToIterFactory is a mapping from the index of the backup layer in + // manifests to its IterFactory. + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory + subdir string + localityInfo []jobspb.RestoreDetails_BackupLocalityInfo + enc *jobspb.BackupEncryptionOptions + kmsEnv cloud.KMSEnv + fileSizes [][]int64 } type backupShower struct { @@ -704,7 +716,7 @@ func backupShowerDefault(p sql.PlanHookState, showSchemas bool, opts tree.KVOpti var rows []tree.Datums for layer, manifest := range info.manifests { - descriptors, err := backupinfo.BackupManifestDescriptors(&manifest) + descriptors, err := backupinfo.BackupManifestDescriptors(ctx, info.layerToIterFactory[layer], manifest.EndTime) if err != nil { return nil, err } @@ -754,7 +766,7 @@ func backupShowerDefault(p sql.PlanHookState, showSchemas bool, opts tree.KVOpti fileSizes = info.fileSizes[layer] } - tableSizes, err := getTableSizes(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, info, manifest, fileSizes) + tableSizes, err := getTableSizes(ctx, info.layerToIterFactory[layer], fileSizes) if err != nil { return nil, err } @@ -927,27 +939,27 @@ type descriptorSize struct { // BackupManifest_File identifies a span in an SST and there can be multiple // spans stored in an SST. func getLogicalSSTSize( - ctx context.Context, - storeFactory cloud.ExternalStorageFactory, - manifest backuppb.BackupManifest, - enc *jobspb.BackupEncryptionOptions, - kmsEnv cloud.KMSEnv, + ctx context.Context, iterFactory *backupinfo.IterFactory, ) (map[string]int64, error) { ctx, span := tracing.ChildSpan(ctx, "backupccl.getLogicalSSTSize") defer span.Finish() sstDataSize := make(map[string]int64) - it, err := makeBackupManifestFileIterator(ctx, storeFactory, manifest, enc, kmsEnv) + it, err := iterFactory.NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + f := it.Value() sstDataSize[f.Path] += f.EntryCounts.DataSize } - if it.err() != nil { - return nil, it.err() - } return sstDataSize, nil } @@ -960,30 +972,33 @@ func approximateSpanPhysicalSize( // getTableSizes gathers row and size count for each table in the manifest func getTableSizes( - ctx context.Context, - storeFactory cloud.ExternalStorageFactory, - info backupInfo, - manifest backuppb.BackupManifest, - fileSizes []int64, + ctx context.Context, iterFactory *backupinfo.IterFactory, fileSizes []int64, ) (map[descpb.ID]descriptorSize, error) { ctx, span := tracing.ChildSpan(ctx, "backupccl.getTableSizes") defer span.Finish() - logicalSSTSize, err := getLogicalSSTSize(ctx, storeFactory, manifest, info.enc, info.kmsEnv) + logicalSSTSize, err := getLogicalSSTSize(ctx, iterFactory) if err != nil { return nil, err } - it, err := makeBackupManifestFileIterator(ctx, storeFactory, manifest, info.enc, info.kmsEnv) + it, err := iterFactory.NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() + defer it.Close() tableSizes := make(map[descpb.ID]descriptorSize) var tenantID roachpb.TenantID var showCodec keys.SQLCodec var idx int - for f, hasNext := it.next(); hasNext; f, hasNext = it.next() { + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + f := it.Value() if !tenantID.IsSet() { var err error _, tenantID, err = keys.DecodeTenantPrefix(f.Span.Key) @@ -1015,9 +1030,6 @@ func getTableSizes( tableSizes[descpb.ID(tableID)] = s idx++ } - if it.err() != nil { - return nil, it.err() - } return tableSizes, nil } @@ -1129,7 +1141,7 @@ var backupShowerDoctor = backupShower{ var namespaceTable doctor.NamespaceTable // Extract all the descriptors from the given manifest and generate the // namespace and descriptor tables needed by doctor. - descriptors, _, err := backupinfo.LoadSQLDescsFromBackupsAtTime(info.manifests, hlc.Timestamp{}) + descriptors, _, err := backupinfo.LoadSQLDescsFromBackupsAtTime(ctx, info.manifests, info.layerToIterFactory, hlc.Timestamp{}) if err != nil { return nil, err } @@ -1219,20 +1231,24 @@ func backupShowerFileSetup( backupType = "incremental" } - logicalSSTSize, err := getLogicalSSTSize(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, manifest, - info.enc, info.kmsEnv) + logicalSSTSize, err := getLogicalSSTSize(ctx, info.layerToIterFactory[i]) if err != nil { return nil, err } - it, err := makeBackupManifestFileIterator(ctx, p.ExecCfg().DistSQLSrv.ExternalStorage, - manifest, info.enc, info.kmsEnv) + it, err := info.layerToIterFactory[i].NewFileIter(ctx) if err != nil { return nil, err } - defer it.close() + defer it.Close() var idx int - for file, hasNext := it.next(); hasNext; file, hasNext = it.next() { + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + file := it.Value() filePath := file.Path if inCol != nil { filePath = path.Join(manifestDirs[i], filePath) @@ -1263,9 +1279,6 @@ func backupShowerFileSetup( }) idx++ } - if it.err() != nil { - return nil, it.err() - } } return rows, nil }, diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index ec0e189b7c23..f577d8190451 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -348,8 +348,10 @@ func fullClusterTargetsBackup( // mainBackupManifests are sorted by Endtime and this check only applies to // backups with a start time that is less than the restore AOST. func checkMissingIntroducedSpans( + ctx context.Context, restoringDescs []catalog.Descriptor, mainBackupManifests []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, endTime hlc.Timestamp, codec keys.SQLCodec, ) error { @@ -374,12 +376,31 @@ func checkMissingIntroducedSpans( // Gather the _online_ tables included in the previous backup. prevOnlineTables := make(map[descpb.ID]struct{}) - for _, desc := range mainBackupManifests[i-1].Descriptors { - if table, _, _, _, _ := descpb.GetDescriptors(&desc); table != nil && table.Public() { + prevDescIt := layerToIterFactory[i-1].NewDescIter(ctx) + defer prevDescIt.Close() + for ; ; prevDescIt.Next() { + if ok, err := prevDescIt.Valid(); err != nil { + return err + } else if !ok { + break + } + + if table, _, _, _, _ := descpb.GetDescriptors(prevDescIt.Value()); table != nil && table.Public() { prevOnlineTables[table.GetID()] = struct{}{} } } + prevDescRevIt := layerToIterFactory[i-1].NewDescriptorChangesIter(ctx) + defer prevDescRevIt.Close() + for ; ; prevDescRevIt.Next() { + if ok, err := prevDescRevIt.Valid(); err != nil { + return err + } else if !ok { + break + } + + } + // Gather the tables that were reintroduced in the current backup (i.e. // backed up from ts=0). tablesIntroduced := make(map[descpb.ID]struct{}) @@ -430,10 +451,19 @@ that was running an IMPORT at the time of the previous incremental in this chain }) } - for _, desc := range mainBackupManifests[i].Descriptors { + descIt := layerToIterFactory[i].NewDescIter(ctx) + defer descIt.Close() + + for ; ; descIt.Next() { + if ok, err := descIt.Valid(); err != nil { + return err + } else if !ok { + break + } + // Check that all online tables at backup time were either introduced or // in the previous backup. - if table, _, _, _, _ := descpb.GetDescriptors(&desc); table != nil && table.Public() { + if table, _, _, _, _ := descpb.GetDescriptors(descIt.Value()); table != nil && table.Public() { if err := requiredIntroduction(table); err != nil { return err } @@ -444,8 +474,17 @@ that was running an IMPORT at the time of the previous incremental in this chain // where a descriptor may appear in manifest.DescriptorChanges but not // manifest.Descriptors. If a descriptor switched from offline to online at // any moment during the backup interval, it needs to be reintroduced. - for _, desc := range mainBackupManifests[i].DescriptorChanges { - if table, _, _, _, _ := descpb.GetDescriptors(desc.Desc); table != nil && table.Public() { + descRevIt := layerToIterFactory[i].NewDescriptorChangesIter(ctx) + defer descRevIt.Close() + + for ; ; descRevIt.Next() { + if ok, err := descRevIt.Valid(); err != nil { + return err + } else if !ok { + break + } + + if table, _, _, _, _ := descpb.GetDescriptors(descRevIt.Value().Desc); table != nil && table.Public() { if err := requiredIntroduction(table); err != nil { return err } @@ -470,6 +509,7 @@ func selectTargets( ctx context.Context, p sql.PlanHookState, backupManifests []backuppb.BackupManifest, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, targets tree.BackupTargetList, descriptorCoverage tree.DescriptorCoverage, asOf hlc.Timestamp, @@ -482,7 +522,7 @@ func selectTargets( ) { ctx, span := tracing.ChildSpan(ctx, "backupccl.selectTargets") defer span.Finish() - allDescs, lastBackupManifest, err := backupinfo.LoadSQLDescsFromBackupsAtTime(backupManifests, asOf) + allDescs, lastBackupManifest, err := backupinfo.LoadSQLDescsFromBackupsAtTime(ctx, backupManifests, layerToIterFactory, asOf) if err != nil { return nil, nil, nil, nil, err } diff --git a/pkg/sql/execinfrapb/api.go b/pkg/sql/execinfrapb/api.go index ecd89228bf4f..b755df7844a5 100644 --- a/pkg/sql/execinfrapb/api.go +++ b/pkg/sql/execinfrapb/api.go @@ -87,6 +87,7 @@ func (m *ChangeFrontierSpec) User() username.SQLUsername { return m.UserProto.Decode() } +// User accesses the user field. func (m *GenerativeSplitAndScatterSpec) User() username.SQLUsername { return m.UserProto.Decode() } diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel index a8ccf07aa3af..ba065f15f790 100644 --- a/pkg/util/bulk/BUILD.bazel +++ b/pkg/util/bulk/BUILD.bazel @@ -3,7 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "bulk", - srcs = ["tracing_aggregator.go"], + srcs = [ + "iterator.go", + "tracing_aggregator.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", visibility = ["//visibility:public"], deps = [ diff --git a/pkg/util/bulk/iterator.go b/pkg/util/bulk/iterator.go new file mode 100644 index 000000000000..57f7b09dced9 --- /dev/null +++ b/pkg/util/bulk/iterator.go @@ -0,0 +1,46 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +// Iterator is an interface to iterate a collection of objects of type T. +type Iterator[T any] interface { + // Valid must be called after any call to Next(). It returns (true, nil) if + // the iterator points to a valid value, and (false, nil) if the iterator has + // moved past the last value. It returns (false, err) if there is an error in + // the iterator. + Valid() (bool, error) + + // Value returns the current value. The returned value is only valid until the + // next call to Next(). is only valid until the + Value() T + + // Next advances the iterator to the next value. + Next() + + // Close closes the iterator. + Close() +} + +// CollectToSlice iterates over all objects in iterator and collects them into a +// slice. +func CollectToSlice[T any](iterator Iterator[T]) ([]T, error) { + var values []T + for ; ; iterator.Next() { + if ok, err := iterator.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + values = append(values, iterator.Value()) + } + return values, nil +}