Skip to content

Commit

Permalink
Merge pull request #93475 from stevendanna/backport22.2-91563-91884-9…
Browse files Browse the repository at this point in the history
…2005-92121
  • Loading branch information
stevendanna authored Dec 13, 2022
2 parents ccce779 + 745959e commit 1f8f9fe
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 113 deletions.
25 changes: 24 additions & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,34 @@ func TestAlterChangefeedTelemetry(t *testing.T) {

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
defer closeFeed(t, testFeed)

feed := testFeed.(cdctest.EnterpriseTestFeed)

require.NoError(t, feed.Pause())

// The job system clears the lease asyncronously after
// cancellation. This lease clearing transaction can
// cause a restart in the alter changefeed
// transaction, which will lead to different feature
// counter counts. Thus, we want to wait for the lease
// clear. However, the lease clear isn't guaranteed to
// happen, so we only wait a few seconds for it.
waitForNoLease := func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for {
if ctx.Err() != nil {
return
}
var sessionID []byte
sqlDB.QueryRow(t, `SELECT claim_session_id FROM system.jobs WHERE id = $1`, feed.JobID()).Scan(&sessionID)
if sessionID == nil {
return
}
time.Sleep(250 * time.Millisecond)
}
}

waitForNoLease()
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID()))

counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
Expand Down
62 changes: 55 additions & 7 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,26 +415,59 @@ func (r *Registry) runJob(
log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err)
}

r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
r.maybeRecordExecutionFailure(ctx, err, job)
// NB: After this point, the job may no longer have the claim
// and further updates to the job record from this node may
// fail.
r.maybeClearLease(job, err)
r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
if r.knobs.AfterJobStateMachine != nil {
r.knobs.AfterJobStateMachine()
}
return err
}

const clearClaimQuery = `
UPDATE system.jobs
SET claim_session_id = NULL, claim_instance_id = NULL
WHERE id = $1
AND claim_session_id = $2
AND claim_instance_id = $3`

// maybeClearLease clears the claim on the given job, provided that
// the current lease matches our liveness Session.
func (r *Registry) maybeClearLease(job *Job, jobErr error) {
if jobErr == nil {
return
}
r.clearLeaseForJobID(job.ID(), nil /* txn */)
}

func (r *Registry) clearLeaseForJobID(jobID jobspb.JobID, txn *kv.Txn) {
// We use the serverCtx here rather than the context from the
// caller since the caller's context may have been canceled.
r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) {
n, err := r.ex.ExecEx(ctx, "clear-job-claim", txn,
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
clearClaimQuery, jobID, s.ID().UnsafeBytes(), r.ID())
if err != nil {
log.Warningf(ctx, "could not clear job claim: %s", err.Error())
return
}
log.VEventf(ctx, 2, "cleared leases for %d jobs", n)
})
}

const pauseAndCancelUpdate = `
UPDATE system.jobs
SET status =
SET status =
CASE
WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `'
WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `'
ELSE status
END,
num_runs = 0,
last_run = NULL,
claim_session_id = NULL,
claim_instance_id = NULL
last_run = NULL
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
Expand Down Expand Up @@ -464,11 +497,26 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
statusString := *row[1].(*tree.DString)
switch Status(statusString) {
case StatusPaused:
r.cancelRegisteredJobContext(id)
if !r.cancelRegisteredJobContext(id) {
// If we didn't already have a running job for this lease,
// clear out the lease here since it won't be cleared be
// cleared out on Resume exit.
r.clearLeaseForJobID(id, txn)
}
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
r.cancelRegisteredJobContext(id)
if !r.cancelRegisteredJobContext(id) {
// If we didn't already have a running job for this
// lease, clear out the lease here since it won't be
// cleared be cleared out on Resume exit.
//
// NB: This working as part of the update depends on
// the fact that the job struct does not have a
// claim set and thus won't validate the claim on
// update.
r.clearLeaseForJobID(id, txn)
}
md.Payload.Error = errJobCanceled.Error()
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestScheduleControl(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelper(t)
defer cleanup()

Expand Down Expand Up @@ -139,6 +142,8 @@ func TestScheduleControl(t *testing.T) {

func TestJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, nil)
defer cleanup()

Expand Down Expand Up @@ -247,6 +252,7 @@ func TestJobsControlForSchedules(t *testing.T) {
// jobs prior to executing the control command.
func TestFilterJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer ResetConstructors()()

argsFn := func(args *base.TestServerArgs) {
Expand Down Expand Up @@ -327,6 +333,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) {

func TestJobControlByType(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer ResetConstructors()()

argsFn := func(args *base.TestServerArgs) {
Expand Down
Loading

0 comments on commit 1f8f9fe

Please sign in to comment.