Skip to content

Commit

Permalink
intentresolver: use caller's context deadline in intent resolution
Browse files Browse the repository at this point in the history
This commit respects the caller's deadline when sending intent resolution
batches. It also sets a timeout using the previous value applied to all
batches to asynchronous intent resolution calls.

Fixes #36953.

Release note: None
  • Loading branch information
ajwerner committed Apr 25, 2019
1 parent fa9bc9e commit ca88a83
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 35 deletions.
33 changes: 18 additions & 15 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import (

// TODO(ajwerner): Consider a more general purpose interface for this package.
// While several interface-oriented interfaces have been explored they all felt
// heavy and allocation intensive
// heavy and allocation intensive.

// TODO(ajwerner): Consider providing an interface which enables a single
// goroutine to dispatch a number of requests destined for different ranges to
Expand Down Expand Up @@ -271,11 +271,11 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
}
return nil
}
if !ba.deadline.IsZero() {
if !ba.sendDeadline.IsZero() {
actualSend := send
send = func(context.Context) error {
return contextutil.RunWithTimeout(
ctx, b.sendBatchOpName, timeutil.Until(ba.deadline), actualSend)
ctx, b.sendBatchOpName, timeutil.Until(ba.sendDeadline), actualSend)
}
}
b.cfg.Stopper.RunWorker(ctx, func(ctx context.Context) {
Expand Down Expand Up @@ -304,14 +304,14 @@ func addRequestToBatch(cfg *Config, now time.Time, ba *batch, r *request) (shoul
// Update the deadline for the batch if this requests's deadline is later
// than the current latest.
rDeadline, rHasDeadline := r.ctx.Deadline()
// If this is the first request
// If this is the first request or
if len(ba.reqs) == 0 ||
// or we have requests and we have a deadline and
(len(ba.reqs) > 0 && !ba.latestCtxDeadline.IsZero() &&
// this request doesn't have a deadline or has a later deadline
(!rHasDeadline || rDeadline.After(ba.latestCtxDeadline))) {
// there are already requests and there is a deadline and
(len(ba.reqs) > 0 && !ba.sendDeadline.IsZero() &&
// this request either doesn't have a deadline or has a later deadline,
(!rHasDeadline || rDeadline.After(ba.sendDeadline))) {
// set the deadline to this request's deadline.
ba.latestCtxDeadline = rDeadline
ba.sendDeadline = rDeadline
}

ba.reqs = append(ba.reqs, r)
Expand Down Expand Up @@ -438,16 +438,19 @@ type batch struct {
reqs []*request
size int // bytes

// latestCtxDeadline is the latest deadline read from a context corresponding
// to a request in reqs. It will be zero valued if any request does not
// contain a deadline.
latestCtxDeadline time.Time
// sendDeadline is the latest deadline reported by a request's context.
// It will be zero valued if any request does not contain a deadline.
sendDeadline time.Time

// idx is the batch's index in the batchQueue.
idx int

deadline time.Time
startTime time.Time
// deadline is the time at which this batch should be sent according to the
// Batcher's configuration.
deadline time.Time
// startTime is the time at which the first request was added to the batch.
startTime time.Time
// lastUpdated is the latest time when a request was added to the batch.
lastUpdated time.Time
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,13 @@ func TestBatchTimeout(t *testing.T) {
assert.Len(t, s.ba.Requests, 2)
s.respChan <- batchResp{}
case <-ctx1.Done():
// This case implies that the test did not exercise what was intended
// but that's okay, clean up the other request and return.
cancel2()
wg.Wait()
return
}
wg.Wait()
testutils.IsError(err1, context.DeadlineExceeded.Error())
assert.Nil(t, err2)
})
Expand Down
34 changes: 14 additions & 20 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,12 @@ const (
// TODO(bdarnell): how to determine best value?
defaultTaskLimit = 1000

// intentResolverTimeout is the timeout when processing a group of intents.
// The timeout prevents intent resolution from getting stuck. Since
// processing intents is best effort, we'd rather give up than wait too long
// (this helps avoid deadlocks during test shutdown).
intentResolverTimeout = 30 * time.Second

// gcTimeout is the timeout when processing gc of a batch of txn records.
// Since processing txn records is best effort, we'd rather give up than
// wait too long (this helps avoid deadlocks during test shutdown).
gcTimeout = 30 * time.Second
// asyncIntentResolutionTimeout is the timeout when processing a group of
// intents asynchronously. The timeout prevents intent async intent
// resolution from getting stuck. Since processing intents is best effort,
// we'd rather give up than wait too long (this helps avoid deadlocks during
// test shutdown).
asyncIntentResolutionTimeout = 30 * time.Second

// intentResolverBatchSize is the maximum number of intents that will be
// resolved in a single batch. Batches that span many ranges (which is
Expand Down Expand Up @@ -204,7 +200,6 @@ func New(c Config) *IntentResolver {
MaxIdle: c.MaxGCBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
BatchTimeout: gcTimeout,
})
batchSize := intentResolverBatchSize
if c.TestingKnobs.MaxIntentResolutionBatchSize > 0 {
Expand All @@ -217,7 +212,6 @@ func New(c Config) *IntentResolver {
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
BatchTimeout: intentResolverTimeout,
})
return ir
}
Expand Down Expand Up @@ -510,10 +504,13 @@ func (ir *IntentResolver) CleanupIntentsAsync(
now := ir.clock.Now()
for _, item := range intents {
if err := ir.runAsyncTask(ctx, allowSyncProcessing, func(ctx context.Context) {
if _, err := ir.CleanupIntents(ctx, item.Intents, now, roachpb.PUSH_TOUCH); err != nil {
if ir.every.ShouldLog() {
log.Warning(ctx, err)
}
err := contextutil.RunWithTimeout(ctx, "async intent resolution",
asyncIntentResolutionTimeout, func(ctx context.Context) error {
_, err := ir.CleanupIntents(ctx, item.Intents, now, roachpb.PUSH_TOUCH)
return err
})
if err != nil && ir.every.ShouldLog() {
log.Warning(ctx, err)
}
}); err != nil {
return err
Expand Down Expand Up @@ -940,10 +937,7 @@ func (ir *IntentResolver) ResolveIntents(
b := &client.Batch{}
b.Header.MaxSpanRequestKeys = intentResolverBatchSize
b.AddRawRequest(req)
if err := contextutil.RunWithTimeout(ctx, "resolve span intents", intentResolverTimeout,
func(ctx context.Context) error {
return ir.db.Run(ctx, b)
}); err != nil {
if err := ir.db.Run(ctx, b); err != nil {
return err
}
// Check response to see if it must be resumed.
Expand Down

0 comments on commit ca88a83

Please sign in to comment.