Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: jobs: remove FOR UPDATE clause when updating job #68244

Merged
merged 1 commit into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,15 +599,20 @@ func (r *Registry) LoadJobWithTxn(

// UpdateJobWithTxn calls the Update method on an existing job with jobID, using
// a transaction passed in the txn argument. Passing a nil transaction means
// that a txn will be automatically created.
// that a txn will be automatically created. The useReadLock parameter will
// have the update acquire an exclusive lock on the job row when reading. This
// can help eliminate restarts in the face of concurrent updates at the cost of
// locking the row from readers. Most updates of a job do not expect contention
// and may do extra work and thus should not do locking. Cases where the job
// is used to coordinate resources from multiple nodes may benefit from locking.
func (r *Registry) UpdateJobWithTxn(
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, updateFunc UpdateFn,
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, useReadLock bool, updateFunc UpdateFn,
) error {
j := &Job{
id: jobID,
registry: r,
}
return j.Update(ctx, txn, updateFunc)
return j.update(ctx, txn, useReadLock, updateFunc)
}

// DefaultCancelInterval is a reasonable interval at which to poll this node
Expand Down
36 changes: 29 additions & 7 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,21 @@ func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobU
// Note that there are various convenience wrappers (like FractionProgressed)
// defined in jobs.go.
func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error {
const useReadLock = false
return j.update(ctx, txn, useReadLock, updateFn)
}

func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateFn UpdateFn) error {
var payload *jobspb.Payload
var progress *jobspb.Progress

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
stmt := "SELECT status, payload, progress FROM system.jobs WHERE id = $1 FOR UPDATE"
if j.sessionID != "" {
stmt = "SELECT status, payload, progress, claim_session_id FROM system." +
"jobs WHERE id = $1 FOR UPDATE"
}
var err error
var row tree.Datums
row, err = j.registry.ex.QueryRowEx(
ctx, "log-job", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
stmt, j.ID(),
ctx, "log-job", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
getSelectStmtForJobUpdate(j.sessionID != "", useReadLock), j.ID(),
)
if err != nil {
return err
Expand Down Expand Up @@ -253,3 +255,23 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error
}
return nil
}

// getSelectStmtForJobUpdate constructs the select statement used in Job.update.
func getSelectStmtForJobUpdate(hasSessionID, useReadLock bool) string {
const (
selectWithoutSession = `SELECT status, payload, progress`
selectWithSession = selectWithoutSession + `, claim_session_id`
from = ` FROM system.jobs WHERE id = $1`
fromForUpdate = from + ` FOR UPDATE`
)
if hasSessionID {
if useReadLock {
return selectWithSession + fromForUpdate
}
return selectWithSession + from
}
if useReadLock {
return selectWithoutSession + fromForUpdate
}
return selectWithoutSession + from
}
3 changes: 2 additions & 1 deletion pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ func (j *SeqChunkProvider) RequestChunk(
ju.UpdateProgress(progress)
return nil
}
err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, resolveChunkFunc)
const useReadLock = true
err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, useReadLock, resolveChunkFunc)
if err != nil {
return err
}
Expand Down