Skip to content

Commit

Permalink
ccl/backupccl: add new split and scatter processor that generates imp…
Browse files Browse the repository at this point in the history
…ort spans

Previously, restore creates all of its import spans all at once and stores them
in memory. OOMs caused by the size of these import spans on restore of large
backups with many incremental chains has been the cause of many escalations.
This patch modifies import span creation so that import spans are generated one
at a time. This span generator then used in the split and scatter processor to
generate the import spans that are used in the rest of restore instead of
having the spans specified in the processor's spec. A future patch will add
memory monitoring to the import span generation to further safeguard against
OOMs in restore.

This patch also changes the import span generation algorithm. The cluster
setting `bulkio.restore.use_simple_import_spans` is introduced in this patch,
which, if set to true, will revert the algorithm back to makeSimpleImportSpans.

Release note: None
  • Loading branch information
Rui Hu committed Jan 24, 2023
1 parent dc974cd commit f3e0327
Show file tree
Hide file tree
Showing 15 changed files with 1,236 additions and 224 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"backup_telemetry.go",
"create_scheduled_backup.go",
"file_sst_sink.go",
"generative_split_and_scatter_processor.go",
"key_rewriter.go",
"restoration_data.go",
"restore_data_processor.go",
Expand Down
16 changes: 10 additions & 6 deletions pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,22 @@ func checkFiles(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaFiles []backuppb.BackupManifest_File
var file backuppb.BackupManifest_File
it, err := bm.NewFileIter(ctx)
if err != nil {
t.Fatal(err)
}
defer it.Close()

for it.Next(&file) {
metaFiles = append(metaFiles, file)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
ok, err := it.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}

metaFiles = append(metaFiles, *it.Value())
}

require.Equal(t, m.Files, metaFiles)
Expand Down
77 changes: 48 additions & 29 deletions pkg/ccl/backupccl/backupinfo/backup_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ func writeDescsToMetadata(
return nil
}

func FileCmp(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) int {
if cmp := left.Span.Key.Compare(right.Span.Key); cmp != 0 {
return cmp
}

return strings.Compare(left.Path, right.Path)
}

func writeFilesSST(
ctx context.Context,
m *backuppb.BackupManifest,
Expand All @@ -280,8 +288,7 @@ func writeFilesSST(

// Sort and write all of the files into a single file info SST.
sort.Slice(m.Files, func(i, j int) bool {
cmp := m.Files[i].Span.Key.Compare(m.Files[j].Span.Key)
return cmp < 0 || (cmp == 0 && strings.Compare(m.Files[i].Path, m.Files[j].Path) < 0)
return FileCmp(m.Files[i], m.Files[j]) < 0
})

for i := range m.Files {
Expand Down Expand Up @@ -1015,10 +1022,11 @@ func (si *SpanIterator) Next(span *roachpb.Span) bool {
return false
}

// FileIterator is a simple iterator to iterate over stats.TableStatisticProtos.
// FileIterator is a simple iterator to iterate over backuppb.BackupManifest_File.
type FileIterator struct {
mergedIterator storage.SimpleMVCCIterator
err error
file *backuppb.BackupManifest_File
}

// NewFileIter creates a new FileIterator for the backup metadata.
Expand Down Expand Up @@ -1076,40 +1084,51 @@ func (fi *FileIterator) Close() {
fi.mergedIterator.Close()
}

// Err returns the iterator's error.
func (fi *FileIterator) Err() error {
return fi.err
}

// Next retrieves the next file in the iterator.
//
// Next returns true if next element was successfully unmarshalled into file,
// and false if there are no more elements or if an error was encountered. When
// Next returns false, the user should call the Err method to verify the
// existence of an error.
func (fi *FileIterator) Next(file *backuppb.BackupManifest_File) bool {
// Valid indicates whether or not the iterator is pointing to a valid value.
func (fi *FileIterator) Valid() (bool, error) {
if fi.err != nil {
return false
return false, fi.err
}

valid, err := fi.mergedIterator.Valid()
if err != nil || !valid {
fi.err = err
return false
}
v, err := fi.mergedIterator.UnsafeValue()
if err != nil {
if ok, err := fi.mergedIterator.Valid(); !ok {
fi.err = err
return false
return ok, err
}
err = protoutil.Unmarshal(v, file)
if err != nil {
fi.err = err
return false

if fi.file == nil {
v, err := fi.mergedIterator.UnsafeValue()
if err != nil {
fi.err = err
return false, fi.err
}

file := &backuppb.BackupManifest_File{}
err = protoutil.Unmarshal(v, file)
if err != nil {
fi.err = err
return false, fi.err
}
fi.file = file
}
return true, nil
}

// Value returns the current value of the iterator, if valid.
func (fi *FileIterator) Value() *backuppb.BackupManifest_File {
return fi.file
}

// Next advances the iterator the the next value.
func (fi *FileIterator) Next() {
fi.mergedIterator.Next()
return true
fi.file = nil
}

// Reset resets the iterator to the first value.
func (fi *FileIterator) Reset() {
fi.mergedIterator.SeekGE(storage.MVCCKey{})
fi.err = nil
fi.file = nil
}

// DescIterator is a simple iterator to iterate over descpb.Descriptors.
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/backupccl/backuprand/backup_rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
// randomly generated tables and verifies their data and schema are preserved.
// It tests that full database backup as well as all subsets of per-table backup
// roundtrip properly. 50% of the time, the test runs the restore with the
// schema_only parameter, which does not restore any rows from user tables.
// schema_only parameter, which does not restore any rows from user tables. The
// test will also run with bulkio.restore.use_simple_import_spans set to true
// 50% of the time.
func TestBackupRestoreRandomDataRoundtrips(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -72,6 +74,10 @@ func TestBackupRestoreRandomDataRoundtrips(t *testing.T) {
runSchemaOnlyExtension = ", schema_only"
}

if rng.Intn(2) == 0 {
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.use_simple_import_spans = true")
}

tables := sqlDB.Query(t, `SELECT name FROM crdb_internal.tables WHERE
database_name = 'rand' AND schema_name = 'public'`)
var tableNames []string
Expand Down
24 changes: 20 additions & 4 deletions pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
Expand All @@ -27,6 +29,7 @@ func BenchmarkCoverageChecks(b *testing.B) {
r, _ := randutil.NewTestRand()

for _, numBackups := range []int{1, 7, 24, 24 * 4} {
numBackups := numBackups
b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) {
for _, numSpans := range []int{10, 20, 100} {
b.Run(fmt.Sprintf("numSpans=%d", numSpans), func(b *testing.B) {
Expand Down Expand Up @@ -61,6 +64,7 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
ctx := context.Background()
r, _ := randutil.NewTestRand()
for _, numBackups := range []int{1, 2, 24, 24 * 4} {
numBackups := numBackups
b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) {
for _, baseFiles := range []int{0, 100, 10000} {
b.Run(fmt.Sprintf("numFiles=%d", baseFiles), func(b *testing.B) {
Expand All @@ -82,10 +86,22 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, &execCfg,
backups, nil, nil)
require.NoError(b, err)
cov, err := makeSimpleImportSpans(backups[numBackups-1].Spans, backups,
layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier,
nil, 0)
require.NoError(b, err)

spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
defer close(spanCh)
return generateAndSendImportSpans(ctx, backups[numBackups-1].Spans, backups,
layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, 0, spanCh, false)
})

var cov []execinfrapb.RestoreSpanEntry
for entry := range spanCh {
cov = append(cov, entry)
}

require.NoError(b, g.Wait())
b.ReportMetric(float64(len(cov)), "coverSize")
}
})
Expand Down
Loading

0 comments on commit f3e0327

Please sign in to comment.