From aab9608e982895613ce13a028151a2ac9fe13e76 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 19 Aug 2020 11:16:05 -0400 Subject: [PATCH] bulkio: Fix transaction semantics in job scheduler. Use transaction when querying for the schedules to run. In addition, ensure that a single bad schedule does not cause all of the previous work to be wasted by using transaction savepoints. Release Notes: None --- pkg/jobs/job_scheduler.go | 22 +++++++-- pkg/jobs/job_scheduler_test.go | 82 +++++++++++++++++++++++++++++++--- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 6ecc863c05fe..e1752d2c1d0d 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -72,7 +72,7 @@ const allSchedules = 0 // scheduled jobs that should be started. func getFindSchedulesStatement(env scheduledjobs.JobSchedulerEnv, maxSchedules int64) string { limitClause := "" - if maxSchedules > 0 { + if maxSchedules != allSchedules { limitClause = fmt.Sprintf("LIMIT %d", maxSchedules) } @@ -236,8 +236,10 @@ func (s *jobScheduler) executeSchedules( defer stats.updateMetrics(&s.metrics) findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules) - rows, cols, err := s.InternalExecutor.QueryWithCols(ctx, "find-scheduled-jobs", nil, - sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser}, + rows, cols, err := s.InternalExecutor.QueryWithCols( + ctx, "find-scheduled-jobs", + txn, + sqlbase.InternalExecutorSessionDataOverride{User: security.NodeUser}, findSchedulesStmt) if err != nil { @@ -252,8 +254,20 @@ func (s *jobScheduler) executeSchedules( continue } + sp, err := txn.CreateSavepoint(ctx) + if err != nil { + return err + } + if err := s.processSchedule(ctx, schedule, numRunning, stats, txn); err != nil { - // We don't know if txn is good at this point, so bail out. + log.Errorf(ctx, "error processing schedule %d: %+v", schedule.ScheduleID(), err) + + if err := txn.RollbackToSavepoint(ctx, sp); err != nil { + return errors.Wrapf(err, "failed to rollback savepoint for schedule %d", schedule.ScheduleID()) + } + } + + if err := txn.ReleaseSavepoint(ctx, sp); err != nil { return err } } diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index d4dee5974d9c..1c8582c23a4c 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -43,8 +42,6 @@ func TestJobSchedulerReschedulesRunning(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 52959) - h, cleanup := newTestHelper(t) defer cleanup() @@ -84,7 +81,10 @@ func TestJobSchedulerReschedulesRunning(t *testing.T) { // The job should not run -- it should be rescheduled `recheckJobAfter` time in the // future. s := newJobScheduler(h.cfg, h.env, metric.NewRegistry()) - require.NoError(t, s.executeSchedules(ctx, allSchedules, nil)) + require.NoError(t, + h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + return s.executeSchedules(ctx, allSchedules, txn) + })) if wait == jobspb.ScheduleDetails_WAIT { expectedRunTime = h.env.Now().Add(recheckRunningAfter) @@ -138,7 +138,10 @@ func TestJobSchedulerExecutesAfterTerminal(t *testing.T) { // Execute the job and verify it has the next run scheduled. s := newJobScheduler(h.cfg, h.env, metric.NewRegistry()) - require.NoError(t, s.executeSchedules(ctx, allSchedules, nil)) + require.NoError(t, + h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + return s.executeSchedules(ctx, allSchedules, txn) + })) expectedRunTime = cronexpr.MustParse("@hourly").Next(h.env.Now()) loaded = h.loadSchedule(t, j.ScheduleID()) @@ -175,7 +178,10 @@ func TestJobSchedulerExecutesAndSchedulesNextRun(t *testing.T) { // Execute the job and verify it has the next run scheduled. s := newJobScheduler(h.cfg, h.env, metric.NewRegistry()) - require.NoError(t, s.executeSchedules(ctx, allSchedules, nil)) + require.NoError(t, + h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + return s.executeSchedules(ctx, allSchedules, txn) + })) expectedRunTime = cronexpr.MustParse("@hourly").Next(h.env.Now()) loaded = h.loadSchedule(t, j.ScheduleID()) @@ -431,6 +437,70 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) { stopper.Stop(ctx) } +// returnErrorExecutor counts the number of times it is +// called, and always returns an error. +type returnErrorExecutor struct { + numCalls int +} + +func (e *returnErrorExecutor) ExecuteJob( + _ context.Context, + _ *scheduledjobs.JobExecutionConfig, + _ scheduledjobs.JobSchedulerEnv, + schedule *ScheduledJob, + _ *kv.Txn, +) error { + e.numCalls++ + return errors.Newf("error for schedule %d", schedule.ScheduleID()) +} + +func (e *returnErrorExecutor) NotifyJobTermination( + _ context.Context, + _ int64, + _ Status, + _ scheduledjobs.JobSchedulerEnv, + _ *ScheduledJob, + _ sqlutil.InternalExecutor, + _ *kv.Txn, +) error { + return nil +} + +func (e *returnErrorExecutor) Metrics() metric.Struct { + return nil +} + +var _ ScheduledJobExecutor = &returnErrorExecutor{} + +func TestJobSchedulerToleratesBadSchedules(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + h, cleanup := newTestHelper(t) + defer cleanup() + + ctx := context.Background() + + const executorName = "return_error" + ex := &returnErrorExecutor{} + defer registerScopedScheduledJobExecutor(executorName, ex)() + + // Create few one-off schedules. + const numJobs = 5 + scheduleRunTime := h.env.Now().Add(time.Hour) + for i := 0; i < numJobs; i++ { + s := h.newScheduledJobForExecutor("schedule", executorName, nil) + s.SetNextRun(scheduleRunTime) + require.NoError(t, s.Create(ctx, h.cfg.InternalExecutor, nil)) + } + h.env.SetTime(scheduleRunTime.Add(time.Second)) + daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry()) + require.NoError(t, + h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + return daemon.executeSchedules(ctx, numJobs, txn) + })) + require.Equal(t, numJobs, ex.numCalls) +} + func TestJobSchedulerDaemonUsesSystemTables(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)