diff --git a/pkg/internal/client/requestbatcher/batcher.go b/pkg/internal/client/requestbatcher/batcher.go index f6694c2288fe..04b08af3cb75 100644 --- a/pkg/internal/client/requestbatcher/batcher.go +++ b/pkg/internal/client/requestbatcher/batcher.go @@ -82,7 +82,7 @@ import ( // TODO(ajwerner): Consider a more general purpose interface for this package. // While several interface-oriented interfaces have been explored they all felt -// heavy and allocation intensive +// heavy and allocation intensive. // TODO(ajwerner): Consider providing an interface which enables a single // goroutine to dispatch a number of requests destined for different ranges to @@ -271,11 +271,11 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { } return nil } - if !ba.deadline.IsZero() { + if !ba.sendDeadline.IsZero() { actualSend := send send = func(context.Context) error { return contextutil.RunWithTimeout( - ctx, b.sendBatchOpName, timeutil.Until(ba.deadline), actualSend) + ctx, b.sendBatchOpName, timeutil.Until(ba.sendDeadline), actualSend) } } b.cfg.Stopper.RunWorker(ctx, func(ctx context.Context) { @@ -304,14 +304,14 @@ func addRequestToBatch(cfg *Config, now time.Time, ba *batch, r *request) (shoul // Update the deadline for the batch if this requests's deadline is later // than the current latest. rDeadline, rHasDeadline := r.ctx.Deadline() - // If this is the first request + // If this is the first request or if len(ba.reqs) == 0 || - // or we have requests and we have a deadline and - (len(ba.reqs) > 0 && !ba.latestCtxDeadline.IsZero() && - // this request doesn't have a deadline or has a later deadline - (!rHasDeadline || rDeadline.After(ba.latestCtxDeadline))) { + // there are already requests and there is a deadline and + (len(ba.reqs) > 0 && !ba.sendDeadline.IsZero() && + // this request either doesn't have a deadline or has a later deadline, + (!rHasDeadline || rDeadline.After(ba.sendDeadline))) { // set the deadline to this request's deadline. - ba.latestCtxDeadline = rDeadline + ba.sendDeadline = rDeadline } ba.reqs = append(ba.reqs, r) @@ -438,16 +438,19 @@ type batch struct { reqs []*request size int // bytes - // latestCtxDeadline is the latest deadline read from a context corresponding - // to a request in reqs. It will be zero valued if any request does not - // contain a deadline. - latestCtxDeadline time.Time + // sendDeadline is the latest deadline reported by a request's context. + // It will be zero valued if any request does not contain a deadline. + sendDeadline time.Time // idx is the batch's index in the batchQueue. idx int - deadline time.Time - startTime time.Time + // deadline is the time at which this batch should be sent according to the + // Batcher's configuration. + deadline time.Time + // startTime is the time at which the first request was added to the batch. + startTime time.Time + // lastUpdated is the latest time when a request was added to the batch. lastUpdated time.Time } diff --git a/pkg/internal/client/requestbatcher/batcher_test.go b/pkg/internal/client/requestbatcher/batcher_test.go index 7fcd476e40d2..3178150d361d 100644 --- a/pkg/internal/client/requestbatcher/batcher_test.go +++ b/pkg/internal/client/requestbatcher/batcher_test.go @@ -344,8 +344,13 @@ func TestBatchTimeout(t *testing.T) { assert.Len(t, s.ba.Requests, 2) s.respChan <- batchResp{} case <-ctx1.Done(): + // This case implies that the test did not exercise what was intended + // but that's okay, clean up the other request and return. + cancel2() + wg.Wait() return } + wg.Wait() testutils.IsError(err1, context.DeadlineExceeded.Error()) assert.Nil(t, err2) }) diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index 573d54f5fa84..4e3ff8577029 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -55,16 +55,12 @@ const ( // TODO(bdarnell): how to determine best value? defaultTaskLimit = 1000 - // intentResolverTimeout is the timeout when processing a group of intents. - // The timeout prevents intent resolution from getting stuck. Since - // processing intents is best effort, we'd rather give up than wait too long - // (this helps avoid deadlocks during test shutdown). - intentResolverTimeout = 30 * time.Second - - // gcTimeout is the timeout when processing gc of a batch of txn records. - // Since processing txn records is best effort, we'd rather give up than - // wait too long (this helps avoid deadlocks during test shutdown). - gcTimeout = 30 * time.Second + // asyncIntentResolutionTimeout is the timeout when processing a group of + // intents asynchronously. The timeout prevents intent async intent + // resolution from getting stuck. Since processing intents is best effort, + // we'd rather give up than wait too long (this helps avoid deadlocks during + // test shutdown). + asyncIntentResolutionTimeout = 30 * time.Second // intentResolverBatchSize is the maximum number of intents that will be // resolved in a single batch. Batches that span many ranges (which is @@ -204,7 +200,6 @@ func New(c Config) *IntentResolver { MaxIdle: c.MaxGCBatchIdle, Stopper: c.Stopper, Sender: c.DB.NonTransactionalSender(), - BatchTimeout: gcTimeout, }) batchSize := intentResolverBatchSize if c.TestingKnobs.MaxIntentResolutionBatchSize > 0 { @@ -217,7 +212,6 @@ func New(c Config) *IntentResolver { MaxIdle: c.MaxIntentResolutionBatchIdle, Stopper: c.Stopper, Sender: c.DB.NonTransactionalSender(), - BatchTimeout: intentResolverTimeout, }) return ir } @@ -510,10 +504,13 @@ func (ir *IntentResolver) CleanupIntentsAsync( now := ir.clock.Now() for _, item := range intents { if err := ir.runAsyncTask(ctx, allowSyncProcessing, func(ctx context.Context) { - if _, err := ir.CleanupIntents(ctx, item.Intents, now, roachpb.PUSH_TOUCH); err != nil { - if ir.every.ShouldLog() { - log.Warning(ctx, err) - } + err := contextutil.RunWithTimeout(ctx, "async intent resolution", + asyncIntentResolutionTimeout, func(ctx context.Context) error { + _, err := ir.CleanupIntents(ctx, item.Intents, now, roachpb.PUSH_TOUCH) + return err + }) + if err != nil && ir.every.ShouldLog() { + log.Warning(ctx, err) } }); err != nil { return err @@ -940,10 +937,7 @@ func (ir *IntentResolver) ResolveIntents( b := &client.Batch{} b.Header.MaxSpanRequestKeys = intentResolverBatchSize b.AddRawRequest(req) - if err := contextutil.RunWithTimeout(ctx, "resolve span intents", intentResolverTimeout, - func(ctx context.Context) error { - return ir.db.Run(ctx, b) - }); err != nil { + if err := ir.db.Run(ctx, b); err != nil { return err } // Check response to see if it must be resumed.