From f0b9e4d59d945abf639260e451b7031af7bbb1af Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 9 Nov 2022 00:17:41 +0000 Subject: [PATCH 1/2] jobs: clear job claim after execution Since #89014 the job system reset a job's claim when transitioning it from pause-requested to paused and from cancel-requested to reverting. The job system signals these transitions to the running Resumer by cancelling the job's context and does not wait for the resumer to exit. Once the claim is clear, another node can adopt the job and start running it's OnFailOrCancel callback. As a result, clearing the context makes it more likely that OnFailOrCancel executions will overlap with Resume executions. In general, Jobs need to assume that Resume may still be running while OnFailOrCancel is called. But, making it more likely isn't in our interest. Here, we only clear the lease when we exit the job state machine. This makes it much more likely that OnFailOrCancel doesn't start until Resume has returned. Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + .../changefeedccl/alter_changefeed_test.go | 3 +- pkg/jobs/adopt.go | 41 +++++++++++++-- pkg/jobs/jobs_test.go | 51 +++++++++++++++++-- pkg/jobs/registry.go | 2 +- pkg/testutils/jobutils/jobs_verification.go | 13 +++++ 6 files changed, 101 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 86e691fdce26..a4a188518610 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -242,6 +242,7 @@ go_test( "//pkg/sql/types", "//pkg/storage", "//pkg/testutils", + "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 296a97cf94d4..39264a0f8a16 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -426,7 +427,7 @@ func TestAlterChangefeedTelemetry(t *testing.T) { feed := testFeed.(cdctest.EnterpriseTestFeed) require.NoError(t, feed.Pause()) - + jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID()) sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID())) counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index da5da78e548a..5aa6b1021464 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -415,26 +415,57 @@ func (r *Registry) runJob( log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } - r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) r.maybeRecordExecutionFailure(ctx, err, job) + // NB: After this point, the job may no longer have the claim + // and further updates to the job record from this node may + // fail. + r.maybeClearLease(job, err) + r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) if r.knobs.AfterJobStateMachine != nil { r.knobs.AfterJobStateMachine() } return err } +const clearClaimQuery = ` + UPDATE system.jobs + SET claim_session_id = NULL, claim_instance_id = NULL + WHERE id = $1 + AND claim_session_id = $2 + AND claim_instance_id = $3 + AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')` + +// maybeClearLease clears the claim on the given job, provided that +// the current lease matches our liveness Session. +func (r *Registry) maybeClearLease(job *Job, jobErr error) { + if jobErr == nil { + return + } + + // We use the serverCtx here rather than the context from the + // caller since the caller's context may have been canceled. + r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) { + n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */ + sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID()) + if err != nil { + log.Warningf(ctx, "could not clear job claim: %s", err.Error()) + return + } + log.VEventf(ctx, 2, "cleared leases for %d jobs", n) + }) +} + const pauseAndCancelUpdate = ` UPDATE system.jobs - SET status = + SET status = CASE WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `' WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `' ELSE status END, num_runs = 0, - last_run = NULL, - claim_session_id = NULL, - claim_instance_id = NULL + last_run = NULL WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')) AND ((claim_session_id = $1) AND (claim_instance_id = $2)) RETURNING id, status diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 8c1b6aef93df..816610112e6a 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -20,6 +20,7 @@ import ( "reflect" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1204,10 +1205,11 @@ func TestJobLifecycle(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -1462,6 +1464,10 @@ func TestJobLifecycle(t *testing.T) { t.Fatal(err) } + // Wait for job to be adopted so that we have the + // lease and can move to succeeded. + resumeSignaler.WaitForResumeStarted() + // PauseRequested fails after job is successful. if err := job.Succeeded(ctx); err != nil { t.Fatal(err) @@ -3107,6 +3113,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) { require.Equal(t, expectedFiles, filesInZip) } +type resumeStartedSignaler struct { + syncutil.Mutex + cond *sync.Cond + isStarted bool +} + +func newResumeStartedSignaler() *resumeStartedSignaler { + ret := &resumeStartedSignaler{} + ret.cond = sync.NewCond(&ret.Mutex) + return ret + +} + +func (r *resumeStartedSignaler) SignalResumeStarted() { + r.Lock() + r.isStarted = true + r.cond.Signal() + r.Unlock() +} + +func (r *resumeStartedSignaler) WaitForResumeStarted() { + r.Lock() + for !r.isStarted { + r.cond.Wait() + } + r.isStarted = false + r.Unlock() +} + // TestPauseReason tests pausing a job with a user specified reason. func TestPauseReason(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3123,10 +3158,11 @@ func TestPauseReason(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -3158,9 +3194,16 @@ func TestPauseReason(t *testing.T) { return n } mustNotHaveClaim := func() { - require.Equal(t, 0, countRowsWithClaimInfo()) + t.Helper() + testutils.SucceedsSoon(t, func() error { + if countRowsWithClaimInfo() == 0 { + return nil + } + return errors.New("still waiting for claim to clear") + }) } mustHaveClaim := func() { + t.Helper() testutils.SucceedsSoon(t, func() error { if countRowsWithClaimInfo() == 1 { return nil @@ -3173,6 +3216,7 @@ func TestPauseReason(t *testing.T) { q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}}) mustHaveClaim() + resumeSignaler.WaitForResumeStarted() getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) { var payloadBytes []byte @@ -3206,6 +3250,7 @@ func TestPauseReason(t *testing.T) { checkStatusAndPauseReason(t, jobID, "running", "for testing") mustHaveClaim() + resumeSignaler.WaitForResumeStarted() } { // Pause the job again with a different reason. Verify that the job is paused with the reason. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index a9c13030d0de..b84f73733a0e 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1529,7 +1529,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j return } if updateErr != nil { - log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err) + log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr) } } diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index e0b3334bd898..502db6cef812 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -89,6 +89,19 @@ func waitForJobToHaveStatus( }, 2*time.Minute) } +func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { + t.Helper() + testutils.SucceedsWithin(t, func() error { + var sessionID []byte + var instanceID gosql.NullInt64 + db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID) + if sessionID == nil && !instanceID.Valid { + return nil + } + return errors.Newf("job %d still has claim information") + }, 2*time.Minute) +} + // RunJob runs the provided job control statement, initializing, notifying and // closing the chan at the passed pointer (see below for why) and returning the // jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard From e1e1965e62815a6c2dd581298d0902e6733611cd Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 10 Nov 2022 12:09:51 +0000 Subject: [PATCH 2/2] jobs: add log scopes to tests Release note: None --- pkg/jobs/delegate_control_test.go | 7 +++++++ pkg/jobs/lease_test.go | 2 ++ pkg/jobs/registry_test.go | 2 ++ 3 files changed, 11 insertions(+) diff --git a/pkg/jobs/delegate_control_test.go b/pkg/jobs/delegate_control_test.go index 0272f38acec2..e6f2ceb045b5 100644 --- a/pkg/jobs/delegate_control_test.go +++ b/pkg/jobs/delegate_control_test.go @@ -24,12 +24,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestScheduleControl(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -139,6 +142,8 @@ func TestScheduleControl(t *testing.T) { func TestJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, nil) defer cleanup() @@ -247,6 +252,7 @@ func TestJobsControlForSchedules(t *testing.T) { // jobs prior to executing the control command. func TestFilterJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) defer ResetConstructors()() argsFn := func(args *base.TestServerArgs) { @@ -327,6 +333,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) { func TestJobControlByType(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) defer ResetConstructors()() argsFn := func(args *base.TestServerArgs) { diff --git a/pkg/jobs/lease_test.go b/pkg/jobs/lease_test.go index ba9c2d027fcf..816882de5ee9 100644 --- a/pkg/jobs/lease_test.go +++ b/pkg/jobs/lease_test.go @@ -20,12 +20,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestJobsTableClaimFamily(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index acfab16efd1e..af56826fbc14 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -974,6 +974,7 @@ func TestRunWithoutLoop(t *testing.T) { func TestJobIdleness(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() intervalOverride := time.Millisecond @@ -1112,6 +1113,7 @@ func TestJobIdleness(t *testing.T) { // allow other job registries in the cluster to claim and run this job. func TestDisablingJobAdoptionClearsClaimSessionID(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) intervalOverride := time.Millisecond s, db, _ := serverutils.StartServer(t, base.TestServerArgs{