Skip to content

Commit

Permalink
Merge pull request #68244 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…21.1-67660

release-21.1: jobs: remove FOR UPDATE clause when updating job
  • Loading branch information
ajwerner authored Aug 10, 2021
2 parents 3168f9b + 84654db commit 4143139
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
11 changes: 8 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,15 +599,20 @@ func (r *Registry) LoadJobWithTxn(

// UpdateJobWithTxn calls the Update method on an existing job with jobID, using
// a transaction passed in the txn argument. Passing a nil transaction means
// that a txn will be automatically created.
// that a txn will be automatically created. The useReadLock parameter will
// have the update acquire an exclusive lock on the job row when reading. This
// can help eliminate restarts in the face of concurrent updates at the cost of
// locking the row from readers. Most updates of a job do not expect contention
// and may do extra work and thus should not do locking. Cases where the job
// is used to coordinate resources from multiple nodes may benefit from locking.
func (r *Registry) UpdateJobWithTxn(
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, updateFunc UpdateFn,
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, useReadLock bool, updateFunc UpdateFn,
) error {
j := &Job{
id: jobID,
registry: r,
}
return j.Update(ctx, txn, updateFunc)
return j.update(ctx, txn, useReadLock, updateFunc)
}

// DefaultCancelInterval is a reasonable interval at which to poll this node
Expand Down
36 changes: 29 additions & 7 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,21 @@ func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobU
// Note that there are various convenience wrappers (like FractionProgressed)
// defined in jobs.go.
func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error {
const useReadLock = false
return j.update(ctx, txn, useReadLock, updateFn)
}

func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateFn UpdateFn) error {
var payload *jobspb.Payload
var progress *jobspb.Progress

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
stmt := "SELECT status, payload, progress FROM system.jobs WHERE id = $1 FOR UPDATE"
if j.sessionID != "" {
stmt = "SELECT status, payload, progress, claim_session_id FROM system." +
"jobs WHERE id = $1 FOR UPDATE"
}
var err error
var row tree.Datums
row, err = j.registry.ex.QueryRowEx(
ctx, "log-job", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
stmt, j.ID(),
ctx, "log-job", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
getSelectStmtForJobUpdate(j.sessionID != "", useReadLock), j.ID(),
)
if err != nil {
return err
Expand Down Expand Up @@ -253,3 +255,23 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error
}
return nil
}

// getSelectStmtForJobUpdate constructs the select statement used in Job.update.
func getSelectStmtForJobUpdate(hasSessionID, useReadLock bool) string {
const (
selectWithoutSession = `SELECT status, payload, progress`
selectWithSession = selectWithoutSession + `, claim_session_id`
from = ` FROM system.jobs WHERE id = $1`
fromForUpdate = from + ` FOR UPDATE`
)
if hasSessionID {
if useReadLock {
return selectWithSession + fromForUpdate
}
return selectWithSession + from
}
if useReadLock {
return selectWithoutSession + fromForUpdate
}
return selectWithoutSession + from
}
3 changes: 2 additions & 1 deletion pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ func (j *SeqChunkProvider) RequestChunk(
ju.UpdateProgress(progress)
return nil
}
err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, resolveChunkFunc)
const useReadLock = true
err := j.Registry.UpdateJobWithTxn(ctx, j.JobID, txn, useReadLock, resolveChunkFunc)
if err != nil {
return err
}
Expand Down

0 comments on commit 4143139

Please sign in to comment.