Skip to content

Commit

Permalink
Merge pull request #89936 from miretskiy/backport22.1-89014
Browse files Browse the repository at this point in the history
release-22.1: jobs: Clear out claim info when pausing
  • Loading branch information
miretskiy authored Oct 26, 2022
2 parents e80e1df + ef0e512 commit 3df2af6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,9 @@ const pauseAndCancelUpdate = `
ELSE status
END,
num_runs = 0,
last_run = NULL
last_run = NULL,
claim_session_id = NULL,
claim_instance_id = NULL
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
Expand Down
25 changes: 25 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3202,9 +3202,31 @@ func TestPauseReason(t *testing.T) {
_, err := registry.CreateAdoptableJobWithTxn(ctx, rec, jobID, nil /* txn */)
require.NoError(t, err)

countRowsWithClaimInfo := func() int {
t.Helper()
n := 0
tdb.QueryRow(t,
"SELECT count(*) FROM system.jobs "+
"WHERE id = $1 AND (claim_session_id IS NOT NULL OR claim_instance_id IS NOT NULL)",
jobID).Scan(&n)
return n
}
mustNotHaveClaim := func() {
require.Equal(t, 0, countRowsWithClaimInfo())
}
mustHaveClaim := func() {
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 1 {
return nil
}
return errors.New("still waiting for claim info")
})
}

// First wait until the job is running
q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID)
tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}})
mustHaveClaim()

getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) {
var payloadBytes []byte
Expand All @@ -3228,6 +3250,7 @@ func TestPauseReason(t *testing.T) {
require.NoError(t, registry.PauseRequested(ctx, nil, jobID, "for testing"))
tdb.CheckQueryResultsRetry(t, q, [][]string{{"paused"}})
checkStatusAndPauseReason(t, jobID, "paused", "for testing")
mustNotHaveClaim()
}

{
Expand All @@ -3236,12 +3259,14 @@ func TestPauseReason(t *testing.T) {
tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}})

checkStatusAndPauseReason(t, jobID, "running", "for testing")
mustHaveClaim()
}
{
// Pause the job again with a different reason. Verify that the job is paused with the reason.
require.NoError(t, registry.PauseRequested(ctx, nil, jobID, "second time"))
tdb.CheckQueryResultsRetry(t, q, [][]string{{"paused"}})
checkStatusAndPauseReason(t, jobID, "paused", "second time")
mustNotHaveClaim()
}
}

Expand Down

0 comments on commit 3df2af6

Please sign in to comment.