Skip to content

Commit

Permalink
txnwait: don't reset pusheeTxnTimer when pusher is updated
Browse files Browse the repository at this point in the history
This shouldn't cause any issues with a functioning clock, but it means
that a repeatedly updated pusher can result in a large number of timer
resets. In tests without well functioning clocks, this could result in
starvation.

Release note: None
  • Loading branch information
nvanbenschoten committed Jan 8, 2019
1 parent 2d09a81 commit 600bdd5
Showing 1 changed file with 11 additions and 17 deletions.
28 changes: 11 additions & 17 deletions pkg/storage/txnwait/txnqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,27 +460,17 @@ func (q *Queue) MaybeWaitForPush(
}
}()
}
var pusheeTxnTimer timeutil.Timer
defer pusheeTxnTimer.Stop()
pusherPriority := req.PusherTxn.Priority
pusheePriority := req.PusheeTxn.Priority

first := true
var pusheeTxnTimer timeutil.Timer
defer pusheeTxnTimer.Stop()
// The first time we want to check the pushee's txn record immediately:
// the pushee might be gone by the time the pusher gets here if it cleaned
// itself up after the pusher saw an intent but before it entered this
// queue.
pusheeTxnTimer.Reset(0)
for {
// Set the timer to check for the pushee txn's expiration.
if !first {
expiration := TxnExpiration(pending.txn.Load().(*roachpb.Transaction)).GoTime()
now := q.store.Clock().Now().GoTime()
pusheeTxnTimer.Reset(expiration.Sub(now))
} else {
// The first time we want to check the pushee's txn record immediately:
// the pushee might be gone by the time the pusher gets here if it cleaned
// itself up after the pusher saw an intent but before it entered this
// queue.
pusheeTxnTimer.Reset(0)
first = false
}

select {
case <-ctx.Done():
// Caller has given up.
Expand Down Expand Up @@ -530,6 +520,10 @@ func (q *Queue) MaybeWaitForPush(
log.VEventf(ctx, 1, "pushing expired txn %s", req.PusheeTxn.ID.Short())
return nil, nil
}
// Set the timer to check for the pushee txn's expiration.
expiration := TxnExpiration(updatedPushee).GoTime()
now := q.store.Clock().Now().GoTime()
pusheeTxnTimer.Reset(expiration.Sub(now))

case updatedPusher := <-queryPusherCh:
switch updatedPusher.Status {
Expand Down

0 comments on commit 600bdd5

Please sign in to comment.