Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: jobs: clear job claim after execution #93475

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ go_test(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -426,7 +427,7 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
feed := testFeed.(cdctest.EnterpriseTestFeed)

require.NoError(t, feed.Pause())

jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID())
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID()))

counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
Expand Down
41 changes: 36 additions & 5 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,26 +415,57 @@ func (r *Registry) runJob(
log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err)
}

r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
r.maybeRecordExecutionFailure(ctx, err, job)
// NB: After this point, the job may no longer have the claim
// and further updates to the job record from this node may
// fail.
r.maybeClearLease(job, err)
r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
if r.knobs.AfterJobStateMachine != nil {
r.knobs.AfterJobStateMachine()
}
return err
}

const clearClaimQuery = `
UPDATE system.jobs
SET claim_session_id = NULL, claim_instance_id = NULL
WHERE id = $1
AND claim_session_id = $2
AND claim_instance_id = $3
AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')`

// maybeClearLease clears the claim on the given job, provided that
// the current lease matches our liveness Session.
func (r *Registry) maybeClearLease(job *Job, jobErr error) {
if jobErr == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noob q: why don't we clear the lease if jobErr == nil? Is a job only ever in a pause/cancel requested state if the job registry interrupts job by a context cancellation, bubbling up in a non nil jobErr? I'm just not sure when the state machine returns non-nil errors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the job completes it will exit with a nil error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty. On master, I may eventually add some documentation to reflect this.

return
}

// We use the serverCtx here rather than the context from the
// caller since the caller's context may have been canceled.
r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) {
n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID())
if err != nil {
log.Warningf(ctx, "could not clear job claim: %s", err.Error())
return
}
log.VEventf(ctx, 2, "cleared leases for %d jobs", n)
})
}

const pauseAndCancelUpdate = `
UPDATE system.jobs
SET status =
SET status =
CASE
WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `'
WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `'
ELSE status
END,
num_runs = 0,
last_run = NULL,
claim_session_id = NULL,
claim_instance_id = NULL
last_run = NULL
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
Expand Down
51 changes: 48 additions & 3 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"runtime/pprof"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1211,10 +1212,11 @@ func TestJobLifecycle(t *testing.T) {

done := make(chan struct{})
defer close(done)

resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -1469,6 +1471,10 @@ func TestJobLifecycle(t *testing.T) {
t.Fatal(err)
}

// Wait for job to be adopted so that we have the
// lease and can move to succeeded.
resumeSignaler.WaitForResumeStarted()

// PauseRequested fails after job is successful.
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3114,6 +3120,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) {
require.Equal(t, expectedFiles, filesInZip)
}

type resumeStartedSignaler struct {
syncutil.Mutex
cond *sync.Cond
isStarted bool
}

func newResumeStartedSignaler() *resumeStartedSignaler {
ret := &resumeStartedSignaler{}
ret.cond = sync.NewCond(&ret.Mutex)
return ret

}

func (r *resumeStartedSignaler) SignalResumeStarted() {
r.Lock()
r.isStarted = true
r.cond.Signal()
r.Unlock()
}

func (r *resumeStartedSignaler) WaitForResumeStarted() {
r.Lock()
for !r.isStarted {
r.cond.Wait()
}
r.isStarted = false
r.Unlock()
}

// TestPauseReason tests pausing a job with a user specified reason.
func TestPauseReason(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand All @@ -3130,10 +3165,11 @@ func TestPauseReason(t *testing.T) {

done := make(chan struct{})
defer close(done)

resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -3165,9 +3201,16 @@ func TestPauseReason(t *testing.T) {
return n
}
mustNotHaveClaim := func() {
require.Equal(t, 0, countRowsWithClaimInfo())
t.Helper()
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 0 {
return nil
}
return errors.New("still waiting for claim to clear")
})
}
mustHaveClaim := func() {
t.Helper()
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 1 {
return nil
Expand All @@ -3180,6 +3223,7 @@ func TestPauseReason(t *testing.T) {
q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID)
tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}})
mustHaveClaim()
resumeSignaler.WaitForResumeStarted()

getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) {
var payloadBytes []byte
Expand Down Expand Up @@ -3213,6 +3257,7 @@ func TestPauseReason(t *testing.T) {

checkStatusAndPauseReason(t, jobID, "running", "for testing")
mustHaveClaim()
resumeSignaler.WaitForResumeStarted()
}
{
// Pause the job again with a different reason. Verify that the job is paused with the reason.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j
return
}
if updateErr != nil {
log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err)
log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr)
}
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ func waitForJobToHaveStatus(
}, 2*time.Minute)
}

func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) {
t.Helper()
testutils.SucceedsWithin(t, func() error {
var sessionID []byte
var instanceID gosql.NullInt64
db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID)
if sessionID == nil && !instanceID.Valid {
return nil
}
return errors.Newf("job %d still has claim information")
}, 2*time.Minute)
}

// RunJob runs the provided job control statement, initializing, notifying and
// closing the chan at the passed pointer (see below for why) and returning the
// jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard
Expand Down