diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index ae8b05c51f1f..178f54abe4c6 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -166,6 +166,7 @@ go_test( "create_scheduled_backup_test.go", "datadriven_test.go", "full_cluster_backup_restore_test.go", + "generative_split_and_scatter_processor_test.go", "key_rewriter_test.go", "main_test.go", "partitioned_backup_test.go", diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index afc9877a3467..6fa6318d3a6a 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -270,6 +270,8 @@ func TestBackupRestoreJobTagAndLabel(t *testing.T) { backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts) + mu.Lock() + defer mu.Unlock() require.True(t, found) } diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index 98b491c6f39b..7a3c2e77e83f 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -317,14 +317,22 @@ func runGenerativeSplitAndScatter( idx++ if len(chunk.entries) == int(spec.ChunkSize) { chunk.splitKey = entry.Span.Key - restoreEntryChunksCh <- chunk + select { + case <-ctx.Done(): + return ctx.Err() + case restoreEntryChunksCh <- chunk: + } chunk = restoreEntryChunk{} } chunk.entries = append(chunk.entries, entry) } if len(chunk.entries) > 0 { - restoreEntryChunksCh <- chunk + select { + case <-ctx.Done(): + return ctx.Err() + case restoreEntryChunksCh <- chunk: + } } return nil }) @@ -417,6 +425,12 @@ func runGenerativeSplitAndScatter( node: chunkDestination, } + if restoreKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if restoreKnobs.RunAfterSplitAndScatteringEntry != nil { + restoreKnobs.RunAfterSplitAndScatteringEntry(ctx) + } + } + select { case <-ctx.Done(): return ctx.Err() diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go new file mode 100644 index 000000000000..a19d53bce1d6 --- /dev/null +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go @@ -0,0 +1,140 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/require" +) + +// TestRunGenerativeSplitAndScatterContextCancel verifies that +// runGenerativeSplitAndScatter can be interrupted by canceling the supplied +// context. This test would time out if the context cancellation does not +// interrupt the function. +func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numAccounts = 1000 + const localFoo = "nodelocal://0/foo" + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, + InitManualReplication) + defer cleanupFn() + + st := cluster.MakeTestingClusterSettings() + evalCtx := eval.MakeTestingEvalContext(st) + + testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st) + defer testDiskMonitor.Stop(ctx) + + // Set up the test so that the test context is canceled after the first entry + // has been processed by the generative split and scatterer. + s0 := tc.Server(0) + registry := tc.Server(0).JobRegistry().(*jobs.Registry) + execCfg := s0.ExecutorConfig().(sql.ExecutorConfig) + flowCtx := execinfra.FlowCtx{ + Cfg: &execinfra.ServerConfig{ + Settings: st, + DB: s0.InternalDB().(descs.DB), + JobRegistry: registry, + ExecutorConfig: &execCfg, + TestingKnobs: execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RunAfterSplitAndScatteringEntry: func(ctx context.Context) { + cancel() + }, + }, + }, + }, + EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, + DiskMonitor: testDiskMonitor, + NodeID: evalCtx.NodeID, + } + + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`) + sqlDB.Exec(t, `BACKUP INTO $1`, localFoo) + + backups := sqlDB.QueryStr(t, `SHOW BACKUPS IN $1`, localFoo) + require.Equal(t, 1, len(backups)) + uri := localFoo + "/" + backups[0][0] + + codec := keys.MakeSQLCodec(s0.RPCContext().TenantID) + backupTableDesc := desctestutils.TestingGetPublicTableDescriptor(tc.Servers[0].DB(), codec, "data", "bank") + backupStartKey := backupTableDesc.PrimaryIndexSpan(codec).Key + + spec := makeTestingGenerativeSplitAndScatterSpec( + []string{uri}, + []roachpb.Span{{ + Key: backupStartKey, + EndKey: backupStartKey.PrefixEnd(), + }}, + ) + + oldID := backupTableDesc.GetID() + newID := backupTableDesc.GetID() + 1 + newDesc := protoutil.Clone(backupTableDesc.TableDesc()).(*descpb.TableDescriptor) + newDesc.ID = newID + tableRekeys := []execinfrapb.TableRekey{ + { + OldID: uint32(oldID), + NewDesc: mustMarshalDesc(t, newDesc), + }, + } + + kr, err := MakeKeyRewriterFromRekeys(keys.SystemSQLCodec, tableRekeys, nil, false) + require.NoError(t, err) + + chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} + chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)} + + // Large enough so doneScatterCh never blocks. + doneScatterCh := make(chan entryNode, 1000) + err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh) + + require.Error(t, err, "context canceled") +} + +func makeTestingGenerativeSplitAndScatterSpec( + backupURIs []string, requiredSpans []roachpb.Span, +) execinfrapb.GenerativeSplitAndScatterSpec { + return execinfrapb.GenerativeSplitAndScatterSpec{ + ValidateOnly: false, + URIs: backupURIs, + Encryption: nil, + EndTime: hlc.Timestamp{}, + Spans: requiredSpans, + BackupLocalityInfo: nil, + HighWater: nil, + UserProto: "", + ChunkSize: 1, + TargetSize: 1, + NumEntries: 1, + NumNodes: 1, + JobID: 0, + UseSimpleImportSpans: false, + } +} diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 05e126208115..7e17d528f48e 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1599,6 +1599,10 @@ type BackupRestoreTestingKnobs struct { // execution. CaptureResolvedTableDescSpans func([]roachpb.Span) + // RunAfterSplitAndScatteringEntry allows blocking the RESTORE job after a + // single RestoreSpanEntry has been split and scattered. + RunAfterSplitAndScatteringEntry func(ctx context.Context) + // RunAfterProcessingRestoreSpanEntry allows blocking the RESTORE job after a // single RestoreSpanEntry has been processed and added to the SSTBatcher. RunAfterProcessingRestoreSpanEntry func(ctx context.Context)