Skip to content

Commit

Permalink
backupccl: add RESTORE with schema_only
Browse files Browse the repository at this point in the history
Fixes #83470

Release note (sql change): This pr adds the schema_only flag to RESTORE,
allowing a user to run a normal RESTORE, without restoring any user table data.
This can be used to quickly validate that a given backup is restorable. A
schema_only restore runtime is O(# of descriptors) which is a fraction of a
regular restore's runtime O(# of table rows).

Note that during a cluster level, schema_only restore, the system tables are
read from S3 and written to disk, as this provides important validation
coverage without much runtime cost (system tables should not be large).

After running a successful schema_only RESTORE, the user can revert the cluster
to its pre-restore state by simply dropping the descriptors the schema_only
restore added (e.g. if the user restored a database, they can drop the
database after the restore completes). Note that in the cluster level case, the
restored system data cannot be reverted, this shouldn't matter, as the cluster
was empty before hand.

For the Backup validation use case, RESTORE with schema_only provides near
total validation coverage. In other words, if a user's schema_only RESTORE
works, they can be quite confident that a real RESTORE will work. There's one
notable place schema_only RESTORE lacks coverage:

It doesn't read (or write) from any of the SSTs that store backed up user table
data. To ensure a Backup's SSTs are where the RESTORE cmd would expect them
to be, a user should run SHOW BACKUP ... with check_files. Further, in an
upcoming patch, another flag for RESTORE validation will be introduced --
the verify_backup_table_data flag -- which extends schema_only functionality
to read the table data from S3 and conduct checksums on it. Like with the
schema_only flag, no table data will be ingested into the cluster.
  • Loading branch information
msbutler committed Aug 3, 2022
1 parent 05e3d5d commit 968da3a
Show file tree
Hide file tree
Showing 20 changed files with 640 additions and 199 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,7 @@ unreserved_keyword ::=
| 'RUNNING'
| 'SCHEDULE'
| 'SCHEDULES'
| 'SCHEMA_ONLY'
| 'SCROLL'
| 'SETTING'
| 'SETTINGS'
Expand Down Expand Up @@ -2453,6 +2454,7 @@ restore_options ::=
| 'NEW_DB_NAME' '=' string_or_placeholder
| 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list
| 'TENANT' '=' string_or_placeholder
| 'SCHEMA_ONLY'

scrub_option_list ::=
( scrub_option ) ( ( ',' scrub_option ) )*
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/backup_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
telemetryOptionSkipMissingSequenceOwners = "skip_missing_sequence_owners"
telemetryOptionSkipMissingViews = "skip_missing_views"
telemetryOptionSkipLocalitiesCheck = "skip_localities_check"
telemetryOptionSchemaOnly = "schema_only"
)

// logBackupTelemetry publishes an eventpb.RecoveryEvent about a manually
Expand Down Expand Up @@ -397,6 +398,9 @@ func logRestoreTelemetry(
if opts.Detached {
options = append(options, telemetryOptionDetached)
}
if opts.SchemaOnly {
options = append(options, telemetryOptionSchemaOnly)
}
sort.Strings(options)

event := &eventpb.RecoveryEvent{
Expand Down
168 changes: 89 additions & 79 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2675,99 +2675,109 @@ func TestBackupRestoreDuringUserDefinedTypeChange(t *testing.T) {
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Protects numTypeChangesStarted and numTypeChangesFinished.
var mu syncutil.Mutex
numTypeChangesStarted := 0
numTypeChangesFinished := 0
typeChangesStarted := make(chan struct{})
waitForBackup := make(chan struct{})
typeChangesFinished := make(chan struct{})
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 0, InitManualReplication, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
if numTypeChangesStarted < len(tc.queries) {
numTypeChangesStarted++
if numTypeChangesStarted == len(tc.queries) {
close(typeChangesStarted)
for _, isSchemaOnly := range []bool{true, false} {
suffix := ""
if isSchemaOnly {
suffix = "-schema-only"
}
t.Run(tc.name+suffix, func(t *testing.T) {
// Protects numTypeChangesStarted and numTypeChangesFinished.
var mu syncutil.Mutex
numTypeChangesStarted := 0
numTypeChangesFinished := 0
typeChangesStarted := make(chan struct{})
waitForBackup := make(chan struct{})
typeChangesFinished := make(chan struct{})
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 0, InitManualReplication, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
if numTypeChangesStarted < len(tc.queries) {
numTypeChangesStarted++
if numTypeChangesStarted == len(tc.queries) {
close(typeChangesStarted)
}
mu.Unlock()
<-waitForBackup
} else {
mu.Unlock()
}
mu.Unlock()
<-waitForBackup
} else {
mu.Unlock()
}
return nil
return nil
},
},
},
},
},
})
defer cleanupFn()
})
defer cleanupFn()

// Create a database with a type.
sqlDB.Exec(t, `
// Create a database with a type.
sqlDB.Exec(t, `
CREATE DATABASE d;
CREATE TYPE d.greeting AS ENUM ('hello', 'howdy', 'hi');
`)

// Start ALTER TYPE statement(s) that will block.
for _, query := range tc.queries {
go func(query string, totalQueries int) {
// Note we don't use sqlDB.Exec here because we can't Fatal from within a goroutine.
if _, err := sqlDB.DB.ExecContext(context.Background(), query); err != nil {
t.Error(err)
}
mu.Lock()
numTypeChangesFinished++
if numTypeChangesFinished == totalQueries {
close(typeChangesFinished)
}
mu.Unlock()
}(query, len(tc.queries))
}

// Wait on the type changes to start.
<-typeChangesStarted
// Start ALTER TYPE statement(s) that will block.
for _, query := range tc.queries {
go func(query string, totalQueries int) {
// Note we don't use sqlDB.Exec here because we can't Fatal from within a goroutine.
if _, err := sqlDB.DB.ExecContext(context.Background(), query); err != nil {
t.Error(err)
}
mu.Lock()
numTypeChangesFinished++
if numTypeChangesFinished == totalQueries {
close(typeChangesFinished)
}
mu.Unlock()
}(query, len(tc.queries))
}

// Now create a backup while the type change job is blocked so that
// greeting is backed up with some enum members in READ_ONLY state.
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/test/'`)
// Wait on the type changes to start.
<-typeChangesStarted

// Let the type change finish.
close(waitForBackup)
<-typeChangesFinished
// Now create a backup while the type change job is blocked so that
// greeting is backed up with some enum members in READ_ONLY state.
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/test/'`)

// Now drop the database and restore.
sqlDB.Exec(t, `DROP DATABASE d`)
sqlDB.Exec(t, `RESTORE DATABASE d FROM 'nodelocal://0/test/'`)
// Let the type change finish.
close(waitForBackup)
<-typeChangesFinished

// The type change job should be scheduled and finish. Note that we can't use
// sqlDB.CheckQueryResultsRetry as it Fatal's upon an error. The case below
// will error until the job completes.
for i, query := range tc.succeedAfter {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(context.Background(), query)
return err
})
sqlDB.CheckQueryResults(t, query, [][]string{{tc.expectedSuccess[i]}})
}
// Now drop the database and restore.
sqlDB.Exec(t, `DROP DATABASE d`)
restoreQuery := `RESTORE DATABASE d FROM 'nodelocal://0/test/'`
if isSchemaOnly {
restoreQuery = restoreQuery + " with schema_only"
}
sqlDB.Exec(t, restoreQuery)

// The type change job should be scheduled and finish. Note that we can't use
// sqlDB.CheckQueryResultsRetry as it Fatal's upon an error. The case below
// will error until the job completes.
for i, query := range tc.succeedAfter {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(context.Background(), query)
return err
})
sqlDB.CheckQueryResults(t, query, [][]string{{tc.expectedSuccess[i]}})
}

for i, query := range tc.errorAfter {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(context.Background(), query)
if err == nil {
return errors.New("expected error, found none")
}
if !testutils.IsError(err, tc.expectedError[i]) {
return errors.Newf("expected error %q, found %v", tc.expectedError[i], pgerror.FullError(err))
}
return nil
})
}
})
for i, query := range tc.errorAfter {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(context.Background(), query)
if err == nil {
return errors.New("expected error, found none")
}
if !testutils.IsError(err, tc.expectedError[i]) {
return errors.Newf("expected error %q, found %v", tc.expectedError[i], pgerror.FullError(err))
}
return nil
})
}
})
}
}
}

Expand Down
36 changes: 25 additions & 11 deletions pkg/ccl/backupccl/backuprand/backup_rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
// TestBackupRestoreRandomDataRoundtrips conducts backup/restore roundtrips on
// randomly generated tables and verifies their data and schema are preserved.
// It tests that full database backup as well as all subsets of per-table backup
// roundtrip properly.
// roundtrip properly. 50% of the time, the test runs the restore with the
// schema_only parameter, which does not restore any rows from user tables.
func TestBackupRestoreRandomDataRoundtrips(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -66,6 +67,11 @@ func TestBackupRestoreRandomDataRoundtrips(t *testing.T) {
}
numInserts := 20

runSchemaOnlyExtension := ""
if rng.Intn(10)%2 == 0 {
runSchemaOnlyExtension = ", schema_only"
}

tables := sqlDB.Query(t, `SELECT name FROM crdb_internal.tables WHERE
database_name = 'rand' AND schema_name = 'public'`)
var tableNames []string
Expand All @@ -87,7 +93,9 @@ database_name = 'rand' AND schema_name = 'public'`)
expectedData := make(map[string][][]string)
for _, tableName := range tableNames {
expectedCreateTableStmt[tableName] = sqlDB.QueryStr(t, fmt.Sprintf(`SELECT create_statement FROM [SHOW CREATE TABLE %s]`, tableName))[0][0]
expectedData[tableName] = sqlDB.QueryStr(t, fmt.Sprintf(`SELECT * FROM %s`, tableName))
if runSchemaOnlyExtension == "" {
expectedData[tableName] = sqlDB.QueryStr(t, fmt.Sprintf(`SELECT * FROM %s`, tableName))
}
}

// Now that we've created our random tables, backup and restore the whole DB
Expand All @@ -97,12 +105,12 @@ database_name = 'rand' AND schema_name = 'public'`)
tablesBackup := localFoo + "alltables"
dbBackups := []string{dbBackup, tablesBackup}
if err := backuputils.VerifyBackupRestoreStatementResult(
t, sqlDB, "BACKUP DATABASE rand TO $1", dbBackup,
t, sqlDB, "BACKUP DATABASE rand INTO $1", dbBackup,
); err != nil {
t.Fatal(err)
}
if err := backuputils.VerifyBackupRestoreStatementResult(
t, sqlDB, "BACKUP TABLE rand.* TO $1", tablesBackup,
t, sqlDB, "BACKUP TABLE rand.* INTO $1", tablesBackup,
); err != nil {
t.Fatal(err)
}
Expand All @@ -118,7 +126,12 @@ database_name = 'rand' AND schema_name = 'public'`)
fmt.Sprintf(`SELECT create_statement FROM [SHOW CREATE TABLE %s]`, restoreTable))[0][0]
assert.Equal(t, expectedCreateTableStmt[tableName], createStmt,
"SHOW CREATE %s not equal after RESTORE", tableName)
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT * FROM %s`, tableName), expectedData[tableName])
if runSchemaOnlyExtension == "" {
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT * FROM %s`, restoreTable), expectedData[tableName])
} else {
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT count(*) FROM %s`, restoreTable),
[][]string{{"0"}})
}
}
}

Expand All @@ -128,17 +141,17 @@ database_name = 'rand' AND schema_name = 'public'`)
for _, backup := range dbBackups {
sqlDB.Exec(t, "DROP DATABASE IF EXISTS restoredb")
sqlDB.Exec(t, "CREATE DATABASE restoredb")
tableQuery := fmt.Sprintf("RESTORE rand.* FROM LATEST IN $1 WITH OPTIONS (into_db='restoredb'%s)", runSchemaOnlyExtension)
if err := backuputils.VerifyBackupRestoreStatementResult(
t, sqlDB, "RESTORE rand.* FROM $1 WITH OPTIONS (into_db='restoredb')", backup,
t, sqlDB, tableQuery, backup,
); err != nil {
t.Fatal(err)
}
verifyTables(t, tableNames)
sqlDB.Exec(t, "DROP DATABASE IF EXISTS restoredb")

if err := backuputils.VerifyBackupRestoreStatementResult(
t, sqlDB, "RESTORE DATABASE rand FROM $1 WITH OPTIONS (new_db_name='restoredb')", backup,
); err != nil {
dbQuery := fmt.Sprintf("RESTORE DATABASE rand FROM LATEST IN $1 WITH OPTIONS (new_db_name='restoredb'%s)", runSchemaOnlyExtension)
if err := backuputils.VerifyBackupRestoreStatementResult(t, sqlDB, dbQuery, backup); err != nil {
t.Fatal(err)
}
verifyTables(t, tableNames)
Expand All @@ -155,8 +168,9 @@ database_name = 'rand' AND schema_name = 'public'`)
}
tables := strings.Join(combo, ", ")
t.Logf("Testing subset backup/restore %s", tables)
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TABLE %s TO $1`, tables), backupTarget)
_, err := tc.Conns[0].Exec(fmt.Sprintf("RESTORE TABLE %s FROM $1 WITH OPTIONS (into_db='restoredb')", tables),
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TABLE %s INTO $1`, tables), backupTarget)
_, err := tc.Conns[0].Exec(
fmt.Sprintf("RESTORE TABLE %s FROM LATEST IN $1 WITH OPTIONS (into_db='restoredb' %s)", tables, runSchemaOnlyExtension),
backupTarget)
if err != nil {
if strings.Contains(err.Error(), "skip_missing_foreign_keys") {
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -138,10 +137,6 @@ func newRestoreDataProcessor(
) (execinfra.Processor, error) {
sv := &flowCtx.Cfg.Settings.SV

if spec.Validation != jobspb.RestoreValidation_DefaultRestore {
return nil, errors.New("Restore Data Processor does not support validation yet")
}

rd := &restoreDataProcessor{
flowCtx: flowCtx,
input: input,
Expand Down
Loading

0 comments on commit 968da3a

Please sign in to comment.