diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 5cccc7d139bf..ce5b5782aeb6 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -250,6 +250,7 @@ go_test( "//pkg/sql/types", "//pkg/storage", "//pkg/testutils", + "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 39aefc5108fb..759df1a35439 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -426,7 +427,7 @@ func TestAlterChangefeedTelemetry(t *testing.T) { feed := testFeed.(cdctest.EnterpriseTestFeed) require.NoError(t, feed.Pause()) - + jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID()) sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID())) counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 69a7c6ed399e..24557ddcada4 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -417,26 +417,57 @@ func (r *Registry) runJob( log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } - r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) r.maybeRecordExecutionFailure(ctx, err, job) + // NB: After this point, the job may no longer have the claim + // and further updates to the job record from this node may + // fail. + r.maybeClearLease(job, err) + r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) if r.knobs.AfterJobStateMachine != nil { r.knobs.AfterJobStateMachine() } return err } +const clearClaimQuery = ` + UPDATE system.jobs + SET claim_session_id = NULL, claim_instance_id = NULL + WHERE id = $1 + AND claim_session_id = $2 + AND claim_instance_id = $3 + AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')` + +// maybeClearLease clears the claim on the given job, provided that +// the current lease matches our liveness Session. +func (r *Registry) maybeClearLease(job *Job, jobErr error) { + if jobErr == nil { + return + } + + // We use the serverCtx here rather than the context from the + // caller since the caller's context may have been canceled. + r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) { + n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */ + sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID()) + if err != nil { + log.Warningf(ctx, "could not clear job claim: %s", err.Error()) + return + } + log.VEventf(ctx, 2, "cleared leases for %d jobs", n) + }) +} + const pauseAndCancelUpdate = ` UPDATE system.jobs - SET status = + SET status = CASE WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `' WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `' ELSE status END, num_runs = 0, - last_run = NULL, - claim_session_id = NULL, - claim_instance_id = NULL + last_run = NULL WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')) AND ((claim_session_id = $1) AND (claim_instance_id = $2)) RETURNING id, status diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 243cab6fd063..a312245fd419 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -21,6 +21,7 @@ import ( "runtime/pprof" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1211,10 +1212,11 @@ func TestJobLifecycle(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -1469,6 +1471,10 @@ func TestJobLifecycle(t *testing.T) { t.Fatal(err) } + // Wait for job to be adopted so that we have the + // lease and can move to succeeded. + resumeSignaler.WaitForResumeStarted() + // PauseRequested fails after job is successful. if err := job.Succeeded(ctx); err != nil { t.Fatal(err) @@ -3109,6 +3115,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) { require.Equal(t, expectedFiles, filesInZip) } +type resumeStartedSignaler struct { + syncutil.Mutex + cond *sync.Cond + isStarted bool +} + +func newResumeStartedSignaler() *resumeStartedSignaler { + ret := &resumeStartedSignaler{} + ret.cond = sync.NewCond(&ret.Mutex) + return ret + +} + +func (r *resumeStartedSignaler) SignalResumeStarted() { + r.Lock() + r.isStarted = true + r.cond.Signal() + r.Unlock() +} + +func (r *resumeStartedSignaler) WaitForResumeStarted() { + r.Lock() + for !r.isStarted { + r.cond.Wait() + } + r.isStarted = false + r.Unlock() +} + // TestPauseReason tests pausing a job with a user specified reason. func TestPauseReason(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3125,10 +3160,11 @@ func TestPauseReason(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -3160,9 +3196,16 @@ func TestPauseReason(t *testing.T) { return n } mustNotHaveClaim := func() { - require.Equal(t, 0, countRowsWithClaimInfo()) + t.Helper() + testutils.SucceedsSoon(t, func() error { + if countRowsWithClaimInfo() == 0 { + return nil + } + return errors.New("still waiting for claim to clear") + }) } mustHaveClaim := func() { + t.Helper() testutils.SucceedsSoon(t, func() error { if countRowsWithClaimInfo() == 1 { return nil @@ -3175,6 +3218,7 @@ func TestPauseReason(t *testing.T) { q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}}) mustHaveClaim() + resumeSignaler.WaitForResumeStarted() getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) { var payloadBytes []byte @@ -3208,6 +3252,7 @@ func TestPauseReason(t *testing.T) { checkStatusAndPauseReason(t, jobID, "running", "for testing") mustHaveClaim() + resumeSignaler.WaitForResumeStarted() } { // Pause the job again with a different reason. Verify that the job is paused with the reason. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index af16ea4a9470..ce7d1a8e3e41 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1576,7 +1576,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j return } if updateErr != nil { - log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err) + log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr) } } diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index 1872596d41c7..be8febec0f5b 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -89,6 +89,19 @@ func waitForJobToHaveStatus( }, 2*time.Minute) } +func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { + t.Helper() + testutils.SucceedsWithin(t, func() error { + var sessionID []byte + var instanceID gosql.NullInt64 + db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID) + if sessionID == nil && !instanceID.Valid { + return nil + } + return errors.Newf("job %d still has claim information") + }, 2*time.Minute) +} + // RunJob runs the provided job control statement, initializing, notifying and // closing the chan at the passed pointer (see below for why) and returning the // jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard