Skip to content

Commit

Permalink
address renato comments
Browse files Browse the repository at this point in the history
  • Loading branch information
msbutler committed Jun 12, 2023
1 parent b5bcdaa commit 6655584
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 54 deletions.
94 changes: 55 additions & 39 deletions pkg/cmd/roachtest/tests/backup_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,46 +29,49 @@ import (
func makeBackupFixtureSpecs(override scheduledBackupSpecs) scheduledBackupSpecs {
backupSpecs := makeBackupSpecs(override.backupSpecs, defaultBackupFixtureSpecs.backupSpecs)
specs := scheduledBackupSpecs{
backupSpecs: backupSpecs,
crontab: defaultBackupFixtureSpecs.crontab,
backupSpecs: backupSpecs,
incrementalBackupCrontab: defaultBackupFixtureSpecs.incrementalBackupCrontab,
}
if override.crontab != "" {
specs.crontab = override.crontab
if override.incrementalBackupCrontab != "" {
specs.incrementalBackupCrontab = override.incrementalBackupCrontab
}
override.ignoreExistingBackups = specs.ignoreExistingBackups
// TODO(msbutler): validate the crdb version roachtest will use. We don't want to create a 23.1.0
// backup with a master binary, for example.
specs.ignoreExistingBackups = override.ignoreExistingBackups
return specs
}

// defaultBackupFixtureSpecs defines the default scheduled backup used to create a fixture.
var defaultBackupFixtureSpecs = scheduledBackupSpecs{
crontab: "*/5 * * * *",
// Runs an incremental backup every 5 minutes.
incrementalBackupCrontab: "*/5 * * * *",

// The default option of false prevents roachtest users from overriding the
// latest backup in a collection, which may be used in restore roachtests.
ignoreExistingBackups: false,

backupSpecs: backupSpecs{
version: "23.1.0",
version: "v23.1.1",
cloud: spec.AWS,
fullBackupDir: "LATEST",
backupsIncluded: 24,
backupsIncluded: 48,
workload: tpceRestore{
customers: 25000,
},
},
}

const scheduleLabel = "schedule_cluster"

type scheduledBackupSpecs struct {
backupSpecs

// ignoreExistingBackups if set to true, will allow a new backup chain
// to get written to an already existing backup collection. The default option
// of false prevents roachtest users from overriding the latest backup in a
// collection, which may be used in restore roachtests.
ignoreExistingBackups bool
crontab string
// to get written to an already existing backup collection.
ignoreExistingBackups bool
incrementalBackupCrontab string
}

func (sbs scheduledBackupSpecs) scheduledBackupCmd() string {
// This backup schedule will first run a full backup immediately and then the
// incremental backups at the given crontab cadence until the user cancels the
// incremental backups at the given incrementalBackupCrontab cadence until the user cancels the
// backup schedules. To ensure that only one full backup chain gets created,
// begin the backup schedule at the beginning of the week, as a new full
// backup will get created on Sunday at Midnight ;)
Expand All @@ -77,8 +80,8 @@ func (sbs scheduledBackupSpecs) scheduledBackupCmd() string {
ignoreExistingBackupsOpt = "ignore_existing_backups"
}
backupCmd := fmt.Sprintf(`BACKUP INTO %s WITH revision_history`, sbs.backupCollection())
cmd := fmt.Sprintf(`CREATE SCHEDULE schedule_cluster FOR %s RECURRING '%s' FULL BACKUP '@weekly' WITH SCHEDULE OPTIONS first_run = 'now', %s`,
backupCmd, sbs.crontab, ignoreExistingBackupsOpt)
cmd := fmt.Sprintf(`CREATE SCHEDULE %s FOR %s RECURRING '%s' FULL BACKUP '@weekly' WITH SCHEDULE OPTIONS first_run = 'now', %s`,
scheduleLabel, backupCmd, sbs.incrementalBackupCrontab, ignoreExistingBackupsOpt)
return cmd
}

Expand Down Expand Up @@ -129,6 +132,7 @@ func (bd *backupDriver) prepareCluster(ctx context.Context) {

bd.c.Put(ctx, bd.t.Cockroach(), "./cockroach")
bd.c.Start(ctx, bd.t.L(), option.DefaultStartOptsNoBackups(), install.MakeClusterSettings(), bd.sp.hardware.getCRDBNodes())
bd.assertCorrectCockroachBinary(ctx)
if !bd.sp.backup.ignoreExistingBackups {
// This check allows the roachtest to fail fast, instead of when the
// scheduled backup cmd is issued.
Expand All @@ -147,10 +151,19 @@ func (bd *backupDriver) checkForExistingBackupCollection(ctx context.Context) bo
return collectionCount > 0
}

func (bd *backupDriver) assertCorrectCockroachBinary(ctx context.Context) {
binaryQuery := "SELECT value FROM crdb_internal.node_build_info WHERE field = 'Version'"
conn := bd.c.Conn(ctx, bd.t.L(), 1)
sql := sqlutils.MakeSQLRunner(conn)
var binaryVersion string
sql.QueryRow(bd.t, binaryQuery).Scan(&binaryVersion)
require.Equal(bd.t, bd.sp.backup.version, binaryVersion, "cluster not running on expected binary")
}

func (bd *backupDriver) initWorkload(ctx context.Context) {
if bd.sp.initFromBackupSpecs.version == "" {
bd.t.L().Printf(`Initializing workload via ./workload init`)
bd.sp.backup.workload.initWorkload(ctx, bd.t, bd.c, bd.sp.hardware)
bd.t.L().Printf(`Initializing workload`)
bd.sp.backup.workload.init(ctx, bd.t, bd.c, bd.sp.hardware)
return
}
bd.t.L().Printf(`Initializing workload via restore`)
Expand All @@ -167,7 +180,7 @@ func (bd *backupDriver) initWorkload(ctx context.Context) {
}

func (bd *backupDriver) runWorkload(ctx context.Context) error {
return bd.sp.backup.workload.foregroundRun(ctx, bd.t, bd.c, bd.sp.hardware)
return bd.sp.backup.workload.run(ctx, bd.t, bd.c, bd.sp.hardware)
}

// scheduleBackups begins the backup schedule.
Expand All @@ -182,11 +195,11 @@ func (bd *backupDriver) scheduleBackups(ctx context.Context) {
func (bd *backupDriver) monitorBackups(ctx context.Context) {
conn := bd.c.Conn(ctx, bd.t.L(), 1)
sql := sqlutils.MakeSQLRunner(conn)

for {
time.Sleep(1 * time.Minute)
var activeScheduleCount int
sql.QueryRow(bd.t, `SELECT count(*) FROM [SHOW SCHEDULES] WHERE label ='schedule_cluster' and schedule_status='ACTIVE'`).Scan(&activeScheduleCount)
scheduleCountQuery := fmt.Sprintf(`SELECT count(*) FROM [SHOW SCHEDULES] WHERE label='%s' AND schedule_status='ACTIVE'`, scheduleLabel)
sql.QueryRow(bd.t, scheduleCountQuery).Scan(&activeScheduleCount)
if activeScheduleCount < 2 {
bd.t.L().Printf(`First full backup still running`)
continue
Expand All @@ -196,7 +209,8 @@ func (bd *backupDriver) monitorBackups(ctx context.Context) {
sql.QueryRow(bd.t, backupCountQuery).Scan(&backupCount)
bd.t.L().Printf(`%d scheduled backups taken`, backupCount)
if backupCount >= bd.sp.backup.backupsIncluded {
sql.QueryRow(bd.t, `PAUSE SCHEDULES WITH x AS (SHOW SCHEDULES) SELECT id FROM x WHERE label = 'schedule_cluster'`)
pauseSchedulesQuery := fmt.Sprintf(`PAUSE SCHEDULES WITH x AS (SHOW SCHEDULES) SELECT id FROM x WHERE label = '%s'`, scheduleLabel)
sql.QueryRow(bd.t, pauseSchedulesQuery)
break
}
}
Expand All @@ -223,19 +237,19 @@ func registerBackupFixtures(r registry.Registry) {
skip: "only for fixture generation",
},
{
// 15 GB Backup Fixture. Note, this fixture is created every night to
// 15 GB Backup Fixture. Note, this fixture is created weekly to
// ensure the fixture generation code works.
hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true, cpus: 4}),
backup: makeBackupFixtureSpecs(
scheduledBackupSpecs{
crontab: "*/2 * * * *",
ignoreExistingBackups: true,
incrementalBackupCrontab: "*/2 * * * *",
ignoreExistingBackups: true,
backupSpecs: backupSpecs{
backupsIncluded: 4,
workload: tpceRestore{customers: 1000}}}),
initFromBackupSpecs: backupSpecs{version: "v22.2.1", backupProperties: "inc-count=48"},
timeout: 2 * time.Hour,
tags: registry.Tags("aws"),
tags: registry.Tags("weekly", "aws-weekly"),
},
{
// 8TB Backup Fixture.
Expand All @@ -244,7 +258,7 @@ func registerBackupFixtures(r registry.Registry) {
backupSpecs: backupSpecs{
workload: tpceRestore{customers: 500000}}}),
timeout: 25 * time.Hour,
initFromBackupSpecs: backupSpecs{version: "v22.2.1"},
initFromBackupSpecs: backupSpecs{version: "v22.2.1", backupProperties: "inc-count=48"},
// add the weekly tags to allow an over 24 hour timeout.
tags: registry.Tags("weekly", "aws-weekly"),
skip: "only for fixture generation",
Expand All @@ -255,7 +269,7 @@ func registerBackupFixtures(r registry.Registry) {
backup: makeBackupFixtureSpecs(scheduledBackupSpecs{
backupSpecs: backupSpecs{
workload: tpceRestore{customers: 2000000}}}),
initFromBackupSpecs: backupSpecs{version: "v22.2.1"},
initFromBackupSpecs: backupSpecs{version: "v22.2.1", backupProperties: "inc-count=48"},
timeout: 48 * time.Hour,
// add the weekly tags to allow an over 24 hour timeout.
tags: registry.Tags("weekly", "aws-weekly"),
Expand All @@ -277,17 +291,19 @@ func registerBackupFixtures(r registry.Registry) {
bd := makeBackupDriver(t, c, bf)
bd.prepareCluster(ctx)
bd.initWorkload(ctx)
m := c.NewMonitor(ctx)

workloadCtx, workloadCancel := context.WithCancel(ctx)
defer workloadCancel()
workloadDoneCh := make(chan struct{})
m := c.NewMonitor(workloadCtx)
defer func() {
workloadCancel()
m.Wait()
}()
m.Go(func(ctx context.Context) error {
defer close(workloadDoneCh)
err := bd.runWorkload(workloadCtx)
// The workload should only return an error if the roachtest driver cancels the
// workloadCtx is cancelled after the backup schedule completes.
if err != nil && workloadCtx.Err() == nil {
err := bd.runWorkload(ctx)
// We expect the workload to return a context cancelled error because
// the roachtest driver cancels the monitor's context after the backup
// schedule completes.
if err != nil && ctx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned a
// different error.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
Expand Down
39 changes: 24 additions & 15 deletions pkg/cmd/roachtest/tests/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,10 @@ func (hw hardwareSpecs) String(verbose bool) string {
}

func (hw hardwareSpecs) getWorkloadNode() int {
if hw.workloadNode {
return hw.nodes + 1
if !hw.workloadNode {
panic(`this test does not have a workload node`)
}
return 0
return hw.nodes + 1
}

func (hw hardwareSpecs) getCRDBNodes() option.NodeListOption {
Expand Down Expand Up @@ -673,35 +673,44 @@ type backupWorkload interface {
// DatabaseName specifies the name of the database the workload will operate on.
DatabaseName() string

// initWorkload loads the cluster with the workload's schema and initial data.
initWorkload(ctx context.Context, t test.Test, c cluster.Cluster, sp hardwareSpecs)
// init loads the cluster with the workload's schema and initial data.
init(ctx context.Context, t test.Test, c cluster.Cluster, sp hardwareSpecs)

// foregroundRun begins a foreground workload that runs indefinitely until the passed context
// run begins a workload that runs indefinitely until the passed context
// is cancelled.
foregroundRun(ctx context.Context, t test.Test, c cluster.Cluster, sp hardwareSpecs) error
run(ctx context.Context, t test.Test, c cluster.Cluster, sp hardwareSpecs) error
}

type tpceRestore struct {
customers int
spec *tpceSpec
}

func (tpce tpceRestore) initWorkload(
func (tpce tpceRestore) getSpec(
ctx context.Context, t test.Test, c cluster.Cluster, sp hardwareSpecs,
) {
) *tpceSpec {
if tpce.spec != nil {
return tpce.spec
}
tpceSpec, err := initTPCESpec(ctx, t.L(), c, sp.getWorkloadNode(), sp.getCRDBNodes())
require.NoError(t, err)
tpceSpec.init(ctx, t, c, tpceCmdOptions{
return tpceSpec
}

func (tpce tpceRestore) init(
ctx context.Context, t test.Test, c cluster.Cluster, sp hardwareSpecs,
) {
spec := tpce.getSpec(ctx, t, c, sp)
spec.init(ctx, t, c, tpceCmdOptions{
customers: tpce.customers,
racks: sp.nodes})
}

func (tpce tpceRestore) foregroundRun(
func (tpce tpceRestore) run(
ctx context.Context, t test.Test, c cluster.Cluster, sp hardwareSpecs,
) error {
tpceSpec, err := initTPCESpec(ctx, t.L(), c, sp.getWorkloadNode(), sp.getCRDBNodes())
require.NoError(t, err)

_, err = tpceSpec.run(ctx, t, c, tpceCmdOptions{
spec := tpce.getSpec(ctx, t, c, sp)
_, err := spec.run(ctx, t, c, tpceCmdOptions{
// Set the duration to be a week to ensure the workload never exits early.
duration: time.Hour * 7 * 24,
customers: tpce.customers,
Expand Down

0 comments on commit 6655584

Please sign in to comment.