Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: move alloc heavy Files field from manifest to SST, use slim manifest in backup restore #97210

3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"backup_telemetry.go",
"create_scheduled_backup.go",
"file_sst_sink.go",
"generative_split_and_scatter_processor.go",
"key_rewriter.go",
"restoration_data.go",
"restore_data_processor.go",
Expand Down Expand Up @@ -165,6 +166,7 @@ go_test(
"create_scheduled_backup_test.go",
"datadriven_test.go",
"full_cluster_backup_restore_test.go",
"generative_split_and_scatter_processor_test.go",
"key_rewriter_test.go",
"main_test.go",
"partitioned_backup_test.go",
Expand Down Expand Up @@ -264,6 +266,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/bulk",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
55 changes: 44 additions & 11 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,30 @@ func backup(
var lastCheckpoint time.Time

var completedSpans, completedIntroducedSpans []roachpb.Span
kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)
// TODO(benesch): verify these files, rather than accepting them as truth
// blindly.
// No concurrency yet, so these assignments are safe.
for _, file := range backupManifest.Files {
if file.StartTime.IsEmpty() && !file.EndTime.IsEmpty() {
completedIntroducedSpans = append(completedIntroducedSpans, file.Span)
iterFactory := backupinfo.NewIterFactory(backupManifest, defaultStore, encryption, &kmsEnv)
it, err := iterFactory.NewFileIter(ctx)
if err != nil {
return roachpb.RowCount{}, err
}
defer it.Close()
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
return roachpb.RowCount{}, err
} else if !ok {
break
}

f := it.Value()
if f.StartTime.IsEmpty() && !f.EndTime.IsEmpty() {
completedIntroducedSpans = append(completedIntroducedSpans, f.Span)
} else {
completedSpans = append(completedSpans, file.Span)
completedSpans = append(completedSpans, f.Span)
}
}

Expand All @@ -172,10 +188,6 @@ func backup(
return roachpb.RowCount{}, errors.Wrap(err, "failed to determine nodes on which to run")
}

kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)

backupSpecs, err := distBackupPlanSpecs(
ctx,
planCtx,
Expand Down Expand Up @@ -319,11 +331,26 @@ func backup(
}
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup manifest"})
// Write a `BACKUP_MANIFEST` file to support backups in mixed-version clusters
// with 22.2 nodes.
//
// TODO(adityamaru): We can stop writing `BACKUP_MANIFEST` in 23.2
// because a mixed-version cluster with 23.1 nodes will read the
// `BACKUP_METADATA` instead.
if err := backupinfo.WriteBackupManifest(ctx, defaultStore, backupbase.BackupManifestName,
encryption, &kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
}

// Write a `BACKUP_METADATA` file along with SSTs for all the alloc heavy
// fields elided from the `BACKUP_MANIFEST`.
if backupinfo.WriteMetadataWithFilesSST.Get(&settings.SV) {
if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, defaultStore, encryption,
&kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
}
}

var tableStatistics []*stats.TableStatisticProto
for i := range backupManifest.Descriptors {
if tbl, _, _, _, _ := descpb.GetDescriptors(&backupManifest.Descriptors[i]); tbl != nil {
Expand All @@ -350,7 +377,6 @@ func backup(
Statistics: tableStatistics,
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup table statistics"})
if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &kmsEnv, &statsTable); err != nil {
return roachpb.RowCount{}, err
}
Expand Down Expand Up @@ -897,12 +923,19 @@ func getBackupDetailAndManifest(
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, prevBackups, baseEncryptionOptions, &kmsEnv)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

backupManifest, err := createBackupManifest(
ctx,
execCfg,
txn,
updatedDetails,
prevBackups)
prevBackups,
layerToIterFactory,
)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}
Expand Down
130 changes: 70 additions & 60 deletions pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -36,6 +36,8 @@ import (
"github.com/stretchr/testify/require"
)

// TestMetadataSST has to be in backupccl_test in order to be sure that the
// BACKUP planhook is registered.
func TestMetadataSST(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -112,14 +114,8 @@ func checkMetadata(
}

checkManifest(t, m, bm)
// If there are descriptor changes, we only check those as they should have
// all changes as well as existing descriptors
if len(m.DescriptorChanges) > 0 {
checkDescriptorChanges(ctx, t, m, bm)
} else {
checkDescriptors(ctx, t, m, bm)
}

checkDescriptorChanges(ctx, t, m, bm)
checkDescriptors(ctx, t, m, bm)
checkSpans(ctx, t, m, bm)
// Don't check introduced spans on the first backup.
if m.StartTime != (hlc.Timestamp{}) {
Expand Down Expand Up @@ -147,16 +143,17 @@ func checkDescriptors(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaDescs []descpb.Descriptor
var desc descpb.Descriptor

it := bm.DescIter(ctx)
it := bm.NewDescIter(ctx)
defer it.Close()
for it.Next(&desc) {
metaDescs = append(metaDescs, desc)
}
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

if it.Err() != nil {
t.Fatal(it.Err())
metaDescs = append(metaDescs, *it.Value())
}

require.Equal(t, m.Descriptors, metaDescs)
Expand All @@ -166,15 +163,16 @@ func checkDescriptorChanges(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaRevs []backuppb.BackupManifest_DescriptorRevision
var rev backuppb.BackupManifest_DescriptorRevision
it := bm.DescriptorChangesIter(ctx)
it := bm.NewDescriptorChangesIter(ctx)
defer it.Close()

for it.Next(&rev) {
metaRevs = append(metaRevs, rev)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
metaRevs = append(metaRevs, *it.Value())
}

// Descriptor Changes are sorted by time in the manifest.
Expand All @@ -189,15 +187,22 @@ func checkFiles(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaFiles []backuppb.BackupManifest_File
var file backuppb.BackupManifest_File
it := bm.FileIter(ctx)
it, err := bm.NewFileIter(ctx)
if err != nil {
t.Fatal(err)
}
defer it.Close()

for it.Next(&file) {
metaFiles = append(metaFiles, file)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
ok, err := it.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}

metaFiles = append(metaFiles, *it.Value())
}

require.Equal(t, m.Files, metaFiles)
Expand All @@ -207,15 +212,17 @@ func checkSpans(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaSpans []roachpb.Span
var span roachpb.Span
it := bm.SpanIter(ctx)
it := bm.NewSpanIter(ctx)
defer it.Close()

for it.Next(&span) {
metaSpans = append(metaSpans, span)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

metaSpans = append(metaSpans, it.Value())
}

require.Equal(t, m.Spans, metaSpans)
Expand All @@ -225,14 +232,16 @@ func checkIntroducedSpans(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaSpans []roachpb.Span
var span roachpb.Span
it := bm.IntroducedSpanIter(ctx)
it := bm.NewIntroducedSpanIter(ctx)
defer it.Close()
for it.Next(&span) {
metaSpans = append(metaSpans, span)
}
if it.Err() != nil {
t.Fatal(it.Err())

for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
metaSpans = append(metaSpans, it.Value())
}

require.Equal(t, m.IntroducedSpans, metaSpans)
Expand All @@ -242,15 +251,17 @@ func checkTenants(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaTenants []descpb.TenantInfoWithUsage
var tenant descpb.TenantInfoWithUsage
it := bm.TenantIter(ctx)
it := bm.NewTenantIter(ctx)
defer it.Close()

for it.Next(&tenant) {
metaTenants = append(metaTenants, tenant)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

metaTenants = append(metaTenants, it.Value())
}

require.Equal(t, m.Tenants, metaTenants)
Expand All @@ -268,18 +279,17 @@ func checkStats(
if err != nil {
t.Fatal(err)
}
if len(expectedStats) == 0 {
expectedStats = nil
}

var metaStats = make([]*stats.TableStatisticProto, 0)
var s *stats.TableStatisticProto
it := bm.StatsIter(ctx)
it := bm.NewStatsIter(ctx)
defer it.Close()

for it.Next(&s) {
metaStats = append(metaStats, s)
}
if it.Err() != nil {
t.Fatal(it.Err())
metaStats, err := bulk.CollectToSlice(it)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedStats, metaStats)
}

Expand Down
Loading