Skip to content

Commit

Permalink
Merge #101730
Browse files Browse the repository at this point in the history
101730: roachtest: add mixed-version restores to backup test r=smg260,herkolategan a=renatolabs

This commit updates the `backup/mixed-version` (now
`backup-restore/mixed-version`) to also perform mixed-version
restores. At a high level, it introduces a new function that runs in
mixed-version state that will randomly attempt to restore a subset of
the backups taken up to that point during the test.

The test will verify that we are able to restore, in mixed version,
backups taken in the previous version _and_ in mixed version.

Resolves: #96367.

Epic: [CRDB-19321](https://cockroachlabs.atlassian.net/browse/CRDB-19321)

Release note: None

Co-authored-by: Renato Costa <[email protected]>
  • Loading branch information
craig[bot] and renatolabs committed May 10, 2023
2 parents a833450 + b5410ea commit 270f0cf
Showing 1 changed file with 185 additions and 74 deletions.
259 changes: 185 additions & 74 deletions pkg/cmd/roachtest/tests/mixed_version_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ const (
// columns that cannot be validated after restore, for various
// reasons. See `handleSpecialCases` for more details.
systemTableSentinel = "{SENTINEL}"

// probability that we will attempt to restore a backup in
// mixed-version state.
mixedVersionRestoreProbability = 0.5
)

var (
Expand Down Expand Up @@ -887,6 +891,11 @@ type mixedVersionBackup struct {
// identifiers to backups created during the test
currentBackupID int64

// counter of restores, incremented atomically. Provides unique
// database names that are used as restore targets when table and
// database backups are restored.
currentRestoreID int64

// stopWorkloads can be called to stop the any workloads started in
// this test. Useful when restoring cluster backups, as we don't
// want a stream of errors in the workload due to the nodes
Expand All @@ -897,8 +906,7 @@ type mixedVersionBackup struct {
func newMixedVersionBackup(
c cluster.Cluster, roachNodes option.NodeListOption, dbs ...string,
) *mixedVersionBackup {
mvb := &mixedVersionBackup{cluster: c, dbs: dbs, roachNodes: roachNodes}
return mvb
return &mixedVersionBackup{cluster: c, dbs: dbs, roachNodes: roachNodes}
}

// newBackupType chooses a random backup type (table, database,
Expand Down Expand Up @@ -1011,6 +1019,52 @@ func (mvb *mixedVersionBackup) loadBackupData(
)
}

// takePreviousVersionBackup creates a backup collection (full +
// incremental), and is supposed to be called before any nodes are
// upgraded. This ensures that we are able to restore this backup
// later, when we are in mixed version, and also after the upgrade is
// finalized.
func (mvb *mixedVersionBackup) takePreviousVersionBackup(
ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper,
) error {
// Wait here for a few minutes to allow the workloads (which are
// initializing concurrently with this step) to store some data in
// the cluster by the time the backup is taken. The actual wait time
// chosen is somewhat arbitrary: it's less time than workloads
// typically need to finish initializing (especially tpcc), so the
// backup is taken while data is still being inserted. The actual
// time is irrelevant as far as correctness is concerned: we should
// be able to restore this backup after upgrading regardless of the
// amount of data backed up.
wait := 3 * time.Minute
l.Printf("waiting for %s", wait)
time.Sleep(wait)

if err := mvb.loadTables(ctx, l, rng, h); err != nil {
return err
}

var collection backupCollection
var timestamp string
var err error

// Create full backup.
previousVersion := h.Context().FromVersion
label := fmt.Sprintf("before upgrade in %s", sanitizeVersionForBackup(previousVersion))
collection, _, err = mvb.runBackup(ctx, l, fullBackup{label}, rng, mvb.roachNodes, h)
if err != nil {
return err
}

// Create incremental backup.
collection, timestamp, err = mvb.runBackup(ctx, l, incrementalBackup{collection}, rng, mvb.roachNodes, h)
if err != nil {
return err
}

return mvb.saveContents(ctx, l, rng, &collection, timestamp, h)
}

// randomWait waits from 1s to 5m, to allow for the background
// workload to update the underlying table we are backing up.
func (mvb *mixedVersionBackup) randomWait(l *logger.Logger, rng *rand.Rand) {
Expand All @@ -1024,10 +1078,14 @@ func (mvb *mixedVersionBackup) now() string {
return hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}.AsOfSystemTime()
}

func (mvb *mixedVersionBackup) nextID() int64 {
func (mvb *mixedVersionBackup) nextBackupID() int64 {
return atomic.AddInt64(&mvb.currentBackupID, 1)
}

func (mvb *mixedVersionBackup) nextRestoreID() int64 {
return atomic.AddInt64(&mvb.currentRestoreID, 1)
}

// backupName returns a descriptive name for a backup depending on the
// state of the test we are in. The given label is also used to
// provide more context. Example: '3_22.2.4-to-current_final'
Expand Down Expand Up @@ -1211,7 +1269,7 @@ func (mvb *mixedVersionBackup) runBackup(
switch b := bType.(type) {
case fullBackup:
btype := mvb.newBackupType(rng)
name := mvb.backupName(mvb.nextID(), h, b.label, btype)
name := mvb.backupName(mvb.nextBackupID(), h, b.label, btype)
createOptions := newBackupOptions(rng)
collection = newBackupCollection(name, btype, createOptions)
l.Printf("creating full backup for %s", collection.name)
Expand Down Expand Up @@ -1299,7 +1357,7 @@ func (mvb *mixedVersionBackup) createBackupCollection(
if err := mvb.runJobOnOneOf(ctx, l, fullBackupSpec.Execute.Nodes, h, func() error {
var err error
label := backupCollectionDesc(fullBackupSpec, incBackupSpec)
collection, timestamp, err = mvb.runBackup(ctx, l, fullBackup{label}, rng, fullBackupSpec.Plan.Nodes, h)
collection, _, err = mvb.runBackup(ctx, l, fullBackup{label}, rng, fullBackupSpec.Plan.Nodes, h)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -1369,7 +1427,7 @@ func (mvb *mixedVersionBackup) disableJobAdoption(
}
return nil
}); err != nil {
return err
l.Printf("giving up (probably a mixed-version restore running concurently)")
}

l.Printf("node %d: job adoption disabled", node)
Expand Down Expand Up @@ -1478,6 +1536,90 @@ func (mvb *mixedVersionBackup) planAndRunBackups(
return mvb.createBackupCollection(ctx, l, rng, fullSpec, incSpec, h)
}

// verifyBackupCollection restores the backup collection passed and
// verifies that the contents after the restore match the contents
// when the backup was taken.
func (mvb *mixedVersionBackup) verifyBackupCollection(
ctx context.Context,
l *logger.Logger,
rng *rand.Rand,
h *mixedversion.Helper,
collection *backupCollection,
) error {
l.Printf("verifying %s", collection.name)

// Defaults for the database where the backup will be restored,
// along with the expected names of the tables after restore.
restoreDB := fmt.Sprintf(
"restore_%s_%d", invalidDBNameRE.ReplaceAllString(collection.name, "_"), mvb.nextRestoreID(),
)
restoredTables := tableNamesWithDB(restoreDB, collection.tables)

// Pre-requisites:
switch collection.btype.(type) {
case *clusterBackup:
// For cluster backups, the restored tables are always the same
// as the tables we backed up. In addition, we need to wipe the
// cluster before attempting a restore.
restoredTables = collection.tables
if err := mvb.resetCluster(ctx, l); err != nil {
return err
}
case *tableBackup:
// If we are restoring a table backup , we need to create it
// first.
if err := h.Exec(rng, fmt.Sprintf("CREATE DATABASE %s", restoreDB)); err != nil {
return fmt.Errorf("backup %s: error creating database %s: %w", collection.name, restoreDB, err)
}
}

restoreCmd, options := collection.btype.RestoreCommand(restoreDB)
restoreOptions := append([]string{"detached"}, options...)
// If the backup was created with an encryption passphrase, we
// need to include it when restoring as well.
for _, option := range collection.options {
if ep, ok := option.(encryptionPassphrase); ok {
restoreOptions = append(restoreOptions, ep.String())
}
}

var optionsStr string
if len(restoreOptions) > 0 {
optionsStr = fmt.Sprintf(" WITH %s", strings.Join(restoreOptions, ", "))
}
restoreStmt := fmt.Sprintf(
"%s FROM LATEST IN '%s'%s",
restoreCmd, collection.uri(), optionsStr,
)
var jobID int
if err := h.QueryRow(rng, restoreStmt).Scan(&jobID); err != nil {
return fmt.Errorf("backup %s: error in restore statement: %w", collection.name, err)
}

if err := mvb.waitForJobSuccess(ctx, l, rng, h, jobID); err != nil {
return err
}

restoredContents, err := mvb.computeTableContents(
ctx, l, rng, restoredTables, collection.contents, "" /* timestamp */, h,
)
if err != nil {
return fmt.Errorf("backup %s: error loading restored contents: %w", collection.name, err)
}

for j, contents := range collection.contents {
table := collection.tables[j]
restoredTableContents := restoredContents[j]
l.Printf("%s: verifying %s", collection.name, table)
if err := contents.ValidateRestore(ctx, l, restoredTableContents); err != nil {
return fmt.Errorf("backup %s: %w", collection.name, err)
}
}

l.Printf("%s: OK", collection.name)
return nil
}

// resetCluster wipes the entire cluster and starts it again with the
// current latest binary. This is done before we attempt restoring a
// full cluster backup.
Expand All @@ -1491,81 +1633,47 @@ func (mvb *mixedVersionBackup) resetCluster(ctx context.Context, l *logger.Logge
)
}

// verifyBackups cycles through all the backup collections created for
// the duration of the test, and verifies that restoring the backups
// results in the same data as when the backup was taken.
func (mvb *mixedVersionBackup) verifyBackups(
// verifySomeBackups is supposed to be called in mixed-version
// state. It will randomly pick a sample of the backups taken by the
// test so far (according to `mixedVersionRestoreProbability`), and
// validate the restore.
func (mvb *mixedVersionBackup) verifySomeBackups(
ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper,
) error {
l.Printf("stopping background workloads")
mvb.stopWorkloads()

l.Printf("verifying %d collections created during this test", len(mvb.collections))
var toBeRestored []*backupCollection
for _, collection := range mvb.collections {
l.Printf("verifying %s", collection.name)

// Defaults for the database where the backup will be restored,
// along with the expected names of the tables after restore.
restoreDB := fmt.Sprintf("restore_%s", invalidDBNameRE.ReplaceAllString(collection.name, "_"))
restoredTables := tableNamesWithDB(restoreDB, collection.tables)

// Pre-requisites:
switch collection.btype.(type) {
case *clusterBackup:
// For cluster backups, the restored tables are always the same
// as the tables we backed up. In addition, we need to wipe the
// cluster before attempting a restore.
restoredTables = collection.tables
if err := mvb.resetCluster(ctx, l); err != nil {
return err
}
case *tableBackup:
// If we are restoring a table backup , we need to create it
// first.
if err := h.Exec(rng, fmt.Sprintf("CREATE DATABASE %s", restoreDB)); err != nil {
return fmt.Errorf("backup %s: error creating database %s: %w", collection.name, restoreDB, err)
}
if _, isCluster := collection.btype.(*clusterBackup); isCluster {
continue
}

restoreCmd, options := collection.btype.RestoreCommand(restoreDB)
restoreOptions := append([]string{}, options...)
// If the backup was created with an encryption passphrase, we
// need to include it when restoring as well.
for _, option := range collection.options {
if ep, ok := option.(encryptionPassphrase); ok {
restoreOptions = append(restoreOptions, ep.String())
}
if rng.Float64() < mixedVersionRestoreProbability {
toBeRestored = append(toBeRestored, collection)
}
}

var optionsStr string
if len(restoreOptions) > 0 {
optionsStr = fmt.Sprintf(" WITH %s", strings.Join(restoreOptions, ", "))
}
restoreStmt := fmt.Sprintf(
"%s FROM LATEST IN '%s'%s",
restoreCmd, collection.uri(), optionsStr,
)
if err := h.Exec(rng, restoreStmt); err != nil {
return fmt.Errorf("backup %s: error restoring: %w", collection.name, err)
l.Printf("verifying %d out of %d backups in mixed version", len(toBeRestored), len(mvb.collections))
for _, collection := range toBeRestored {
if err := mvb.verifyBackupCollection(ctx, l, rng, h, collection); err != nil {
return err
}
}

restoredContents, err := mvb.computeTableContents(
ctx, l, rng, restoredTables, collection.contents, "" /* timestamp */, h,
)
if err != nil {
return fmt.Errorf("backup %s: error loading restored contents: %w", collection.name, err)
}
return nil
}

for j, contents := range collection.contents {
table := collection.tables[j]
restoredTableContents := restoredContents[j]
l.Printf("%s: verifying %s", collection.name, table)
if err := contents.ValidateRestore(ctx, l, restoredTableContents); err != nil {
return fmt.Errorf("backup %s: %w", collection.name, err)
}
}
// verifyAllBackups cycles through all the backup collections created
// for the duration of the test, and verifies that restoring the
// backups results in the same data as when the backup was taken.
func (mvb *mixedVersionBackup) verifyAllBackups(
ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper,
) error {
l.Printf("stopping background workloads")
mvb.stopWorkloads()

l.Printf("%s: OK", collection.name)
l.Printf("verifying %d collections created during this test", len(mvb.collections))
for _, collection := range mvb.collections {
if err := mvb.verifyBackupCollection(ctx, l, rng, h, collection); err != nil {
return err
}
}

return nil
Expand All @@ -1578,8 +1686,8 @@ func registerBackupMixedVersion(r registry.Registry) {
// different set of events. Reusing the same seed will produce the
// same test.
r.Add(registry.TestSpec{
Name: "backup/mixed-version",
Timeout: 5 * time.Hour,
Name: "backup-restore/mixed-version",
Timeout: 7 * time.Hour,
Owner: registry.OwnerDisasterRecovery,
Cluster: r.MakeClusterSpec(5),
EncryptionSupport: registry.EncryptionMetamorphic,
Expand Down Expand Up @@ -1613,11 +1721,14 @@ func registerBackupMixedVersion(r registry.Registry) {
mvb := newMixedVersionBackup(c, roachNodes, "bank", "tpcc")
mvt.OnStartup("set short job interval", mvb.setShortJobIntervals)
mvt.OnStartup("load backup data", mvb.loadBackupData)
mvt.OnStartup("take backup in previous version", mvb.takePreviousVersionBackup)

stopBank = mvt.Workload("bank", workloadNode, nil /* initCmd */, bankRun)
stopTPCC = mvt.Workload("tpcc", workloadNode, tpccInit, tpccRun)

mvt.InMixedVersion("plan and run backups", mvb.planAndRunBackups)
mvt.AfterUpgradeFinalized("verify backups", mvb.verifyBackups)
mvt.InMixedVersion("verify some backups", mvb.verifySomeBackups)
mvt.AfterUpgradeFinalized("verify all backups", mvb.verifyAllBackups)

mvb.stopWorkloads = func() {
stopBank()
Expand Down

0 comments on commit 270f0cf

Please sign in to comment.