diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 57057c9370f3..da5da78e548a 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -432,7 +432,9 @@ const pauseAndCancelUpdate = ` ELSE status END, num_runs = 0, - last_run = NULL + last_run = NULL, + claim_session_id = NULL, + claim_instance_id = NULL WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')) AND ((claim_session_id = $1) AND (claim_instance_id = $2)) RETURNING id, status diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 9783c1e91e63..782d75c26c3a 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -3138,9 +3138,31 @@ func TestPauseReason(t *testing.T) { _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, jobID, nil /* txn */) require.NoError(t, err) + countRowsWithClaimInfo := func() int { + t.Helper() + n := 0 + tdb.QueryRow(t, + "SELECT count(*) FROM system.jobs "+ + "WHERE id = $1 AND (claim_session_id IS NOT NULL OR claim_instance_id IS NOT NULL)", + jobID).Scan(&n) + return n + } + mustNotHaveClaim := func() { + require.Equal(t, 0, countRowsWithClaimInfo()) + } + mustHaveClaim := func() { + testutils.SucceedsSoon(t, func() error { + if countRowsWithClaimInfo() == 1 { + return nil + } + return errors.New("still waiting for claim info") + }) + } + // First wait until the job is running q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}}) + mustHaveClaim() getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) { var payloadBytes []byte @@ -3164,6 +3186,7 @@ func TestPauseReason(t *testing.T) { require.NoError(t, registry.PauseRequested(ctx, nil, jobID, "for testing")) tdb.CheckQueryResultsRetry(t, q, [][]string{{"paused"}}) checkStatusAndPauseReason(t, jobID, "paused", "for testing") + mustNotHaveClaim() } { @@ -3172,12 +3195,14 @@ func TestPauseReason(t *testing.T) { tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}}) checkStatusAndPauseReason(t, jobID, "running", "for testing") + mustHaveClaim() } { // Pause the job again with a different reason. Verify that the job is paused with the reason. require.NoError(t, registry.PauseRequested(ctx, nil, jobID, "second time")) tdb.CheckQueryResultsRetry(t, q, [][]string{{"paused"}}) checkStatusAndPauseReason(t, jobID, "paused", "second time") + mustNotHaveClaim() } }