Skip to content

Commit

Permalink
backupccl: move descriptors and descriptor changes field from manifes…
Browse files Browse the repository at this point in the history
…t to SST

As part of an effort to make backup manifests scale better for larger clusters,
this patch moves descriptors and descriptor changes from the manifest to an
external SST. This avoids the need to alloc enough memory to hold every
descriptor and descriptor revision for every layer of a backup during a backup
or restore job. This patch also changes the access pattern for descriptors and
descriptor changes to use iterators, so that they can be accessed in a
streaming manner from the external SST.

Release note: None
  • Loading branch information
Rui Hu committed Feb 16, 2023
1 parent 677cac6 commit e12b1f1
Show file tree
Hide file tree
Showing 23 changed files with 1,517 additions and 577 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/bulk",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
39 changes: 24 additions & 15 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,26 @@ func backup(
// TODO(benesch): verify these files, rather than accepting them as truth
// blindly.
// No concurrency yet, so these assignments are safe.
it, err := makeBackupManifestFileIterator(ctx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage,
*backupManifest, encryption, &kmsEnv)
iterFactory := backupinfo.NewIterFactory(backupManifest, defaultStore, encryption, &kmsEnv)
it, err := iterFactory.NewFileIter(ctx)
if err != nil {
return roachpb.RowCount{}, err
}
defer it.close()
for f, hasNext := it.next(); hasNext; f, hasNext = it.next() {
defer it.Close()
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
return roachpb.RowCount{}, err
} else if !ok {
break
}

f := it.Value()
if f.StartTime.IsEmpty() && !f.EndTime.IsEmpty() {
completedIntroducedSpans = append(completedIntroducedSpans, f.Span)
} else {
completedSpans = append(completedSpans, f.Span)
}
}
if it.err() != nil {
return roachpb.RowCount{}, it.err()
}

// Subtract out any completed spans.
spans := filterSpans(backupManifest.Spans, completedSpans)
Expand Down Expand Up @@ -340,13 +344,11 @@ func backup(

// Write a `BACKUP_METADATA` file along with SSTs for all the alloc heavy
// fields elided from the `BACKUP_MANIFEST`.
//
// TODO(adityamaru,rhu713): Once backup/restore switches from writing and
// reading backup manifests to `metadata.sst` we can stop writing the slim
// manifest.
if err := backupinfo.WriteFilesListMetadataWithSSTs(ctx, defaultStore, encryption,
&kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
if backupinfo.WriteMetadataWithFilesSST.Get(&settings.SV) {
if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, defaultStore, encryption,
&kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, err
}
}

var tableStatistics []*stats.TableStatisticProto
Expand Down Expand Up @@ -921,12 +923,19 @@ func getBackupDetailAndManifest(
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, prevBackups, baseEncryptionOptions, &kmsEnv)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}

backupManifest, err := createBackupManifest(
ctx,
execCfg,
txn,
updatedDetails,
prevBackups)
prevBackups,
layerToIterFactory,
)
if err != nil {
return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err
}
Expand Down
109 changes: 56 additions & 53 deletions pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -36,6 +36,8 @@ import (
"github.com/stretchr/testify/require"
)

// TestMetadataSST has to be in backupccl_test in order to be sure that the
// BACKUP planhook is registered.
func TestMetadataSST(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -112,14 +114,8 @@ func checkMetadata(
}

checkManifest(t, m, bm)
// If there are descriptor changes, we only check those as they should have
// all changes as well as existing descriptors
if len(m.DescriptorChanges) > 0 {
checkDescriptorChanges(ctx, t, m, bm)
} else {
checkDescriptors(ctx, t, m, bm)
}

checkDescriptorChanges(ctx, t, m, bm)
checkDescriptors(ctx, t, m, bm)
checkSpans(ctx, t, m, bm)
// Don't check introduced spans on the first backup.
if m.StartTime != (hlc.Timestamp{}) {
Expand Down Expand Up @@ -147,16 +143,17 @@ func checkDescriptors(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaDescs []descpb.Descriptor
var desc descpb.Descriptor

it := bm.DescIter(ctx)
it := bm.NewDescIter(ctx)
defer it.Close()
for it.Next(&desc) {
metaDescs = append(metaDescs, desc)
}
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

if it.Err() != nil {
t.Fatal(it.Err())
metaDescs = append(metaDescs, *it.Value())
}

require.Equal(t, m.Descriptors, metaDescs)
Expand All @@ -166,15 +163,16 @@ func checkDescriptorChanges(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaRevs []backuppb.BackupManifest_DescriptorRevision
var rev backuppb.BackupManifest_DescriptorRevision
it := bm.DescriptorChangesIter(ctx)
it := bm.NewDescriptorChangesIter(ctx)
defer it.Close()

for it.Next(&rev) {
metaRevs = append(metaRevs, rev)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
metaRevs = append(metaRevs, *it.Value())
}

// Descriptor Changes are sorted by time in the manifest.
Expand Down Expand Up @@ -214,15 +212,17 @@ func checkSpans(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaSpans []roachpb.Span
var span roachpb.Span
it := bm.SpanIter(ctx)
it := bm.NewSpanIter(ctx)
defer it.Close()

for it.Next(&span) {
metaSpans = append(metaSpans, span)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

metaSpans = append(metaSpans, it.Value())
}

require.Equal(t, m.Spans, metaSpans)
Expand All @@ -232,14 +232,16 @@ func checkIntroducedSpans(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaSpans []roachpb.Span
var span roachpb.Span
it := bm.IntroducedSpanIter(ctx)
it := bm.NewIntroducedSpanIter(ctx)
defer it.Close()
for it.Next(&span) {
metaSpans = append(metaSpans, span)
}
if it.Err() != nil {
t.Fatal(it.Err())

for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}
metaSpans = append(metaSpans, it.Value())
}

require.Equal(t, m.IntroducedSpans, metaSpans)
Expand All @@ -249,15 +251,17 @@ func checkTenants(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaTenants []descpb.TenantInfoWithUsage
var tenant descpb.TenantInfoWithUsage
it := bm.TenantIter(ctx)
it := bm.NewTenantIter(ctx)
defer it.Close()

for it.Next(&tenant) {
metaTenants = append(metaTenants, tenant)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
t.Fatal(err)
} else if !ok {
break
}

metaTenants = append(metaTenants, it.Value())
}

require.Equal(t, m.Tenants, metaTenants)
Expand All @@ -275,18 +279,17 @@ func checkStats(
if err != nil {
t.Fatal(err)
}
if len(expectedStats) == 0 {
expectedStats = nil
}

var metaStats = make([]*stats.TableStatisticProto, 0)
var s *stats.TableStatisticProto
it := bm.StatsIter(ctx)
it := bm.NewStatsIter(ctx)
defer it.Close()

for it.Next(&s) {
metaStats = append(metaStats, s)
}
if it.Err() != nil {
t.Fatal(it.Err())
metaStats, err := bulk.CollectToSlice(it)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedStats, metaStats)
}

Expand Down
45 changes: 36 additions & 9 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,7 @@ func getReintroducedSpans(
ctx context.Context,
execCfg *sql.ExecutorConfig,
prevBackups []backuppb.BackupManifest,
layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory,
tables []catalog.TableDescriptor,
revs []backuppb.BackupManifest_DescriptorRevision,
endTime hlc.Timestamp,
Expand All @@ -1078,12 +1079,21 @@ func getReintroducedSpans(
// at backup time, we must find all tables in manifest.DescriptorChanges whose
// last change brought the table offline.
offlineInLastBackup := make(map[descpb.ID]struct{})
lastBackup := prevBackups[len(prevBackups)-1]
lastIterFactory := layerToIterFactory[len(prevBackups)-1]

descIt := lastIterFactory.NewDescIter(ctx)
defer descIt.Close()

for ; ; descIt.Next() {
if ok, err := descIt.Valid(); err != nil {
return nil, err
} else if !ok {
break
}

for _, desc := range lastBackup.Descriptors {
// TODO(pbardea): Also check that lastWriteTime is set once those are
// populated on the table descriptor.
if table, _, _, _, _ := descpb.GetDescriptors(&desc); table != nil && table.Offline() {
if table, _, _, _, _ := descpb.GetDescriptors(descIt.Value()); table != nil && table.Offline() {
offlineInLastBackup[table.GetID()] = struct{}{}
}
}
Expand All @@ -1093,8 +1103,16 @@ func getReintroducedSpans(
// change in the previous backup interval put the table offline, then that
// backup was offline at the endTime of the last backup.
latestTableDescChangeInLastBackup := make(map[descpb.ID]*descpb.TableDescriptor)
for _, rev := range lastBackup.DescriptorChanges {
if table, _, _, _, _ := descpb.GetDescriptors(rev.Desc); table != nil {
descRevIt := lastIterFactory.NewDescriptorChangesIter(ctx)
defer descRevIt.Close()
for ; ; descRevIt.Next() {
if ok, err := descRevIt.Valid(); err != nil {
return nil, err
} else if !ok {
break
}

if table, _, _, _, _ := descpb.GetDescriptors(descRevIt.Value().Desc); table != nil {
if trackedRev, ok := latestTableDescChangeInLastBackup[table.GetID()]; !ok {
latestTableDescChangeInLastBackup[table.GetID()] = table
} else if trackedRev.Version < table.Version {
Expand Down Expand Up @@ -1341,6 +1359,7 @@ func createBackupManifest(
txn *kv.Txn,
jobDetails jobspb.BackupDetails,
prevBackups []backuppb.BackupManifest,
layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory,
) (backuppb.BackupManifest, error) {
mvccFilter := backuppb.MVCCFilter_Latest
if jobDetails.RevisionHistory {
Expand Down Expand Up @@ -1414,9 +1433,17 @@ func createBackupManifest(
if len(prevBackups) > 0 {
tablesInPrev := make(map[descpb.ID]struct{})
dbsInPrev := make(map[descpb.ID]struct{})
rawDescs := prevBackups[len(prevBackups)-1].Descriptors
for i := range rawDescs {
if t, _, _, _, _ := descpb.GetDescriptors(&rawDescs[i]); t != nil {

descIt := layerToIterFactory[len(prevBackups)-1].NewDescIter(ctx)
defer descIt.Close()
for ; ; descIt.Next() {
if ok, err := descIt.Valid(); err != nil {
return backuppb.BackupManifest{}, err
} else if !ok {
break
}

if t, _, _, _, _ := descpb.GetDescriptors(descIt.Value()); t != nil {
tablesInPrev[t.ID] = struct{}{}
}
}
Expand All @@ -1437,7 +1464,7 @@ func createBackupManifest(

newSpans = filterSpans(spans, prevBackups[len(prevBackups)-1].Spans)

reintroducedSpans, err = getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime)
reintroducedSpans, err = getReintroducedSpans(ctx, execCfg, prevBackups, layerToIterFactory, tables, revs, endTime)
if err != nil {
return backuppb.BackupManifest{}, err
}
Expand Down
Loading

0 comments on commit e12b1f1

Please sign in to comment.