diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 24557ddcada4..f44989b39e5e 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -434,8 +434,7 @@ const clearClaimQuery = ` SET claim_session_id = NULL, claim_instance_id = NULL WHERE id = $1 AND claim_session_id = $2 - AND claim_instance_id = $3 - AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')` + AND claim_instance_id = $3` // maybeClearLease clears the claim on the given job, provided that // the current lease matches our liveness Session. @@ -443,13 +442,16 @@ func (r *Registry) maybeClearLease(job *Job, jobErr error) { if jobErr == nil { return } + r.clearLeaseForJobID(job.ID(), nil /* txn */) +} +func (r *Registry) clearLeaseForJobID(jobID jobspb.JobID, txn *kv.Txn) { // We use the serverCtx here rather than the context from the // caller since the caller's context may have been canceled. r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) { - n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */ + n, err := r.ex.ExecEx(ctx, "clear-job-claim", txn, sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, - clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID()) + clearClaimQuery, jobID, s.ID().UnsafeBytes(), r.ID()) if err != nil { log.Warningf(ctx, "could not clear job claim: %s", err.Error()) return @@ -497,11 +499,26 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes statusString := *row[1].(*tree.DString) switch Status(statusString) { case StatusPaused: - r.cancelRegisteredJobContext(id) + if !r.cancelRegisteredJobContext(id) { + // If we didn't already have a running job for this lease, + // clear out the lease here since it won't be cleared be + // cleared out on Resume exit. + r.clearLeaseForJobID(id, txn) + } log.Infof(ctx, "job %d, session %s: paused", id, s.ID()) case StatusReverting: if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error { - r.cancelRegisteredJobContext(id) + if !r.cancelRegisteredJobContext(id) { + // If we didn't already have a running job for this + // lease, clear out the lease here since it won't be + // cleared be cleared out on Resume exit. + // + // NB: This working as part of the update depends on + // the fact that the job struct does not have a + // claim set and thus won't validate the claim on + // update. + r.clearLeaseForJobID(id, txn) + } md.Payload.Error = errJobCanceled.Error() encodedErr := errors.EncodeError(ctx, errJobCanceled) md.Payload.FinalResumeError = &encodedErr diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index dfa1aa84913f..09babfe144f5 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -1193,6 +1193,42 @@ func TestRegistryLifecycle(t *testing.T) { <-completeCh }) + t.Run("job with created by fields", func(t *testing.T) { + createdByType := "internal_test" + rts := registryTestSuite{} + rts.setUp(t) + defer rts.tearDown() + + resumerJob := make(chan *jobs.Job, 1) + jobs.RegisterConstructor( + jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return jobs.FakeResumer{ + OnResume: func(ctx context.Context) error { + resumerJob <- j + return nil + }, + } + }, jobs.UsesTenantCostControl) + + jobID := rts.registry.MakeJobID() + record := jobs.Record{ + Details: jobspb.BackupDetails{}, + Progress: jobspb.BackupProgress{}, + CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123}, + } + job, err := rts.registry.CreateAdoptableJobWithTxn(rts.ctx, record, jobID, nil /* txn */) + require.NoError(t, err) + + loadedJob, err := rts.registry.LoadJob(rts.ctx, jobID) + require.NoError(t, err) + require.NotNil(t, loadedJob.CreatedBy()) + require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy()) + rts.registry.TestingNudgeAdoptionQueue() + resumedJob := <-resumerJob + require.NotNil(t, resumedJob.CreatedBy()) + require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy()) + + }) } func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int) { @@ -1220,6 +1256,10 @@ func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int } } +// TestJobLifecycle tests the invariants about the job lifecycle +// querires. It does not depend on the registries job management tasks +// and assumes that it maintains the lease on the job through all +// state transitions. func TestJobLifecycle(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1227,14 +1267,16 @@ func TestJobLifecycle(t *testing.T) { ctx := context.Background() - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + params, _ := tests.CreateTestServerParams() + params.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{DisableRegistryLifecycleManagent: true} + s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) registry := s.JobRegistry().(*jobs.Registry) createJob := func(record jobs.Record) (*jobs.Job, expectation) { beforeTime := timeutil.Now() - job, err := registry.CreateAdoptableJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */) + job, err := registry.CreateJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */) require.NoError(t, err) payload := job.Payload() return job, expectation{ @@ -1256,38 +1298,6 @@ func TestJobLifecycle(t *testing.T) { return createJob(defaultRecord) } - done := make(chan struct{}) - defer close(done) - resumeSignaler := newResumeStartedSignaler() - jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { - return jobs.FakeResumer{ - OnResume: func(ctx context.Context) error { - resumeSignaler.SignalResumeStarted() - select { - case <-ctx.Done(): - return ctx.Err() - case <-done: - return nil - } - }, - } - }, jobs.UsesTenantCostControl) - - startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) { - beforeTime := timeutil.Now() - job, err := jobs.TestingCreateAndStartJob(ctx, registry, s.DB(), record) - if err != nil { - t.Fatal(err) - } - payload := job.Payload() - return job, expectation{ - DB: sqlDB, - Record: record, - Type: payload.Type(), - Before: beforeTime, - } - } - t.Run("valid job lifecycles succeed", func(t *testing.T) { // Woody is a successful job. woodyPride, _ := username.MakeSQLUsernameFromUserInput("Woody Pride", username.PurposeValidation) @@ -1495,7 +1505,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("cancelable jobs can be paused until finished", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil { t.Fatal(err) @@ -1517,10 +1527,6 @@ func TestJobLifecycle(t *testing.T) { t.Fatal(err) } - // Wait for job to be adopted so that we have the - // lease and can move to succeeded. - resumeSignaler.WaitForResumeStarted() - // PauseRequested fails after job is successful. if err := job.Succeeded(ctx); err != nil { t.Fatal(err) @@ -1532,7 +1538,7 @@ func TestJobLifecycle(t *testing.T) { t.Run("cancelable jobs can be canceled until finished", func(t *testing.T) { { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil { t.Fatal(err) } @@ -1542,7 +1548,7 @@ func TestJobLifecycle(t *testing.T) { } { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() if err := job.Started(ctx); err != nil { t.Fatal(err) } @@ -1555,7 +1561,7 @@ func TestJobLifecycle(t *testing.T) { } { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil { t.Fatal(err) } @@ -1571,7 +1577,7 @@ func TestJobLifecycle(t *testing.T) { } { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := job.Succeeded(ctx); err != nil { t.Fatal(err) } @@ -1584,7 +1590,7 @@ func TestJobLifecycle(t *testing.T) { t.Run("unpaused jobs cannot be resumed", func(t *testing.T) { { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil { t.Fatal(err) } @@ -1594,7 +1600,7 @@ func TestJobLifecycle(t *testing.T) { } { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := job.Succeeded(ctx); err != nil { t.Fatal(err) } @@ -1715,7 +1721,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("progress on paused job fails", func(t *testing.T) { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil { t.Fatal(err) } @@ -1727,7 +1733,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("progress on canceled job fails", func(t *testing.T) { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil { t.Fatal(err) } @@ -1759,7 +1765,7 @@ func TestJobLifecycle(t *testing.T) { updateStatusStmt := `UPDATE system.jobs SET status = $1 WHERE id = $2` t.Run("set details works", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning)) newDetails := jobspb.ImportDetails{URIs: []string{"new"}} exp.Record.Details = newDetails @@ -1775,7 +1781,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("set details fails", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning)) _, err := exp.DB.Exec(updateStatusStmt, jobs.StatusCancelRequested, job.ID()) require.NoError(t, err) @@ -1784,7 +1790,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("set progress works", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning)) newProgress := jobspb.ImportProgress{ResumePos: []int64{42}} exp.Record.Progress = newProgress @@ -1799,47 +1805,13 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("set progress fails", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning)) _, err := exp.DB.Exec(updateStatusStmt, jobs.StatusPauseRequested, job.ID()) require.NoError(t, err) require.Error(t, job.SetProgress(ctx, nil /* txn */, jobspb.ImportProgress{ResumePos: []int64{42}})) require.NoError(t, exp.verify(job.ID(), jobs.StatusPauseRequested)) }) - - t.Run("job with created by fields", func(t *testing.T) { - createdByType := "internal_test" - - resumerJob := make(chan *jobs.Job, 1) - jobs.RegisterConstructor( - jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { - return jobs.FakeResumer{ - OnResume: func(ctx context.Context) error { - resumerJob <- j - return nil - }, - } - }, jobs.UsesTenantCostControl) - - jobID := registry.MakeJobID() - record := jobs.Record{ - Details: jobspb.BackupDetails{}, - Progress: jobspb.BackupProgress{}, - CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123}, - } - job, err := registry.CreateAdoptableJobWithTxn(ctx, record, jobID, nil /* txn */) - require.NoError(t, err) - - loadedJob, err := registry.LoadJob(ctx, jobID) - require.NoError(t, err) - require.NotNil(t, loadedJob.CreatedBy()) - require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy()) - registry.TestingNudgeAdoptionQueue() - resumedJob := <-resumerJob - require.NotNil(t, resumedJob.CreatedBy()) - require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy()) - - }) } // TestShowJobs manually inserts a row into system.jobs and checks that the diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index ce7d1a8e3e41..30fb3ce91d13 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -726,6 +726,10 @@ func (r *Registry) withSession(ctx context.Context, f withSessionFunc) { // jobs if it observes a failure. Otherwise it starts all the main daemons of // registry that poll the jobs table and start/cancel/gc jobs. func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { + if r.knobs.DisableRegistryLifecycleManagent { + return nil + } + // Since the job polling system is outside user control, exclude it from cost // accounting and control. Individual jobs are not part of this exclusion. ctx = multitenant.WithTenantCostControlExemption(ctx) @@ -1504,12 +1508,14 @@ func (r *Registry) unregister(jobID jobspb.JobID) { } } -func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) { +func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) bool { r.mu.Lock() defer r.mu.Unlock() - if aj, ok := r.mu.adoptedJobs[jobID]; ok { + aj, ok := r.mu.adoptedJobs[jobID] + if ok { aj.cancel() } + return ok } func (r *Registry) getClaimedJob(jobID jobspb.JobID) (*Job, error) { diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index 7fe6d8e54a12..6e28862b8142 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -72,6 +72,9 @@ type TestingKnobs struct { // DisableAdoptions disables job adoptions. DisableAdoptions bool + // DisableRegistryLifecycleManagement + DisableRegistryLifecycleManagent bool + // BeforeWaitForJobsQuery is called once per invocation of the // poll-show-jobs query in WaitForJobs. BeforeWaitForJobsQuery func()