Skip to content

Commit

Permalink
Merge #37103
Browse files Browse the repository at this point in the history
37103: intentresolver: use caller's context deadline in intent resolution r=nvanbenschoten a=ajwerner

Having a single timeout value for all batches sent by a request batcher has
proven problematic. The first commit changes the behavior to respect the latest
deadline for a request in a batch. If any request has no deadline, the batch
uses no deadline. The second commit deadline sets a timeout using the previous
value applied to all batches to asynchronous intent resolution calls.

This change isn't exactly equivalent to the prior behavior for async intent
resolution. In particular, the timeout applies to all of the requests rather than
to individual requests. Given the timeout, this seems reasonable. If we fail
to resolve all of the intents we want to within 30s, something else is probably
wrong or we're doing too much work. 


Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Apr 25, 2019
2 parents e28fad1 + 9b8a2af commit 96f73a3
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 54 deletions.
62 changes: 44 additions & 18 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import (

// TODO(ajwerner): Consider a more general purpose interface for this package.
// While several interface-oriented interfaces have been explored they all felt
// heavy and allocation intensive
// heavy and allocation intensive.

// TODO(ajwerner): Consider providing an interface which enables a single
// goroutine to dispatch a number of requests destined for different ranges to
Expand Down Expand Up @@ -134,10 +134,6 @@ type Config struct {
// DefaultInFlightBackpressureLimit.
InFlightBackpressureLimit int

// BatchTimeout is the timeout which is applied to sending a batch.
// A non-positive value implies no timeout.
BatchTimeout time.Duration

// NowFunc is used to determine the current time. It defaults to timeutil.Now.
NowFunc func() time.Time
}
Expand Down Expand Up @@ -236,7 +232,8 @@ func (b *RequestBatcher) SendWithChan(
}

// Send sends req as a part of a batch. An error is returned if the context
// is canceled before the sending of the request completes.
// is canceled before the sending of the request completes. The context with
// the latest deadline for a batch is used to send the underlying batch request.
func (b *RequestBatcher) Send(
ctx context.Context, rangeID roachpb.RangeID, req roachpb.Request,
) (roachpb.Response, error) {
Expand Down Expand Up @@ -266,17 +263,24 @@ func (b *RequestBatcher) sendDone(ba *batch) {
}

func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
var br *roachpb.BatchResponse
send := func(ctx context.Context) error {
var pErr *roachpb.Error
if br, pErr = b.cfg.Sender.Send(ctx, ba.batchRequest()); pErr != nil {
return pErr.GoError()
}
return nil
}
if !ba.sendDeadline.IsZero() {
actualSend := send
send = func(context.Context) error {
return contextutil.RunWithTimeout(
ctx, b.sendBatchOpName, timeutil.Until(ba.sendDeadline), actualSend)
}
}
b.cfg.Stopper.RunWorker(ctx, func(ctx context.Context) {
defer b.sendDone(ba)
var br *roachpb.BatchResponse
err := contextutil.RunWithTimeout(ctx, b.sendBatchOpName,
b.cfg.BatchTimeout, func(ctx context.Context) error {
var pErr *roachpb.Error
if br, pErr = b.cfg.Sender.Send(ctx, ba.batchRequest()); pErr != nil {
return pErr.GoError()
}
return nil
})
err := send(ctx)
for i, r := range ba.reqs {
res := Response{}
if br != nil && i < len(br.Responses) {
Expand All @@ -297,9 +301,23 @@ func (b *RequestBatcher) sendResponse(req *request, resp Response) {
}

func addRequestToBatch(cfg *Config, now time.Time, ba *batch, r *request) (shouldSend bool) {
// Update the deadline for the batch if this requests's deadline is later
// than the current latest.
rDeadline, rHasDeadline := r.ctx.Deadline()
// If this is the first request or
if len(ba.reqs) == 0 ||
// there are already requests and there is a deadline and
(len(ba.reqs) > 0 && !ba.sendDeadline.IsZero() &&
// this request either doesn't have a deadline or has a later deadline,
(!rHasDeadline || rDeadline.After(ba.sendDeadline))) {
// set the deadline to this request's deadline.
ba.sendDeadline = rDeadline
}

ba.reqs = append(ba.reqs, r)
ba.size += r.req.Size()
ba.lastUpdated = now

if cfg.MaxIdle > 0 {
ba.deadline = ba.lastUpdated.Add(cfg.MaxIdle)
}
Expand Down Expand Up @@ -378,7 +396,7 @@ func (b *RequestBatcher) run(ctx context.Context) {
if !deadline.Equal(nextDeadline) || timer.Read {
deadline = nextDeadline
if !deadline.IsZero() {
timer.Reset(time.Until(deadline))
timer.Reset(timeutil.Until(deadline))
} else {
// Clear the current timer due to a sole batch already sent before
// the timer fired.
Expand Down Expand Up @@ -420,11 +438,19 @@ type batch struct {
reqs []*request
size int // bytes

// sendDeadline is the latest deadline reported by a request's context.
// It will be zero valued if any request does not contain a deadline.
sendDeadline time.Time

// idx is the batch's index in the batchQueue.
idx int

deadline time.Time
startTime time.Time
// deadline is the time at which this batch should be sent according to the
// Batcher's configuration.
deadline time.Time
// startTime is the time at which the first request was added to the batch.
startTime time.Time
// lastUpdated is the latest time when a request was added to the batch.
lastUpdated time.Time
}

Expand Down
72 changes: 56 additions & 16 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -296,26 +295,67 @@ func TestPanicWithNilStopper(t *testing.T) {
New(Config{Sender: make(chanSender)})
}

// TestBatchTimeout verfies the the RequestBatcher respects its BatchTimeout
// configuration option.
// TestBatchTimeout verfies the the RequestBatcher uses the context with the
// deadline from the latest call to send.
func TestBatchTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
sc := make(chanSender)
b := New(Config{
// MaxMsgsPerBatch of 1 is chosen so that the first call to Send will
// immediately lead to a batch being sent.
MaxMsgsPerBatch: 1,
Sender: sc,
Stopper: stopper,
// BatchTimeout is chosen to be a very small, non-zero value.
BatchTimeout: time.Microsecond,
t.Run("WithTimeout", func(t *testing.T) {
b := New(Config{
// MaxMsgsPerBatch of 1 is chosen so that the first call to Send will
// immediately lead to a batch being sent.
MaxMsgsPerBatch: 1,
Sender: sc,
Stopper: stopper,
})
ctx, cancel := context.WithTimeout(context.Background(), time.Microsecond)
defer cancel()
resp, err := b.Send(ctx, 1, &roachpb.GetRequest{})
assert.Nil(t, resp)
testutils.IsError(err, context.DeadlineExceeded.Error())
})
t.Run("NoTimeout", func(t *testing.T) {
b := New(Config{
// MaxMsgsPerBatch of 2 is chosen so that the second call to Send will
// immediately lead to a batch being sent.
MaxMsgsPerBatch: 2,
Sender: sc,
Stopper: stopper,
})
// This test attempts to verify that a batch with two requests where one
// carries a timeout leads to the batch being sent without a timeout.
// There is a hazard that the goroutine which is being canceled is not
// able to send its request to the batcher before its deadline expires
// in which case the batch is never sent due to size constraints.
// The test will pass in this scenario with after logging and cleaning up.
const timeout = 5 * time.Millisecond
ctx1, cancel1 := context.WithTimeout(context.Background(), timeout)
defer cancel1()
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
var wg sync.WaitGroup
wg.Add(2)
var err1, err2 error
go func() { _, err1 = b.Send(ctx1, 1, &roachpb.GetRequest{}); wg.Done() }()
go func() { _, err2 = b.Send(ctx2, 1, &roachpb.GetRequest{}); wg.Done() }()
select {
case s := <-sc:
assert.Len(t, s.ba.Requests, 2)
s.respChan <- batchResp{}
case <-ctx1.Done():
// This case implies that the test did not exercise what was intended
// but that's okay, clean up the other request and return.
t.Logf("canceled goroutine failed to send within %v, passing", timeout)
cancel2()
wg.Wait()
return
}
wg.Wait()
testutils.IsError(err1, context.DeadlineExceeded.Error())
assert.Nil(t, err2)
})
resp, err := b.Send(context.Background(), 1, &roachpb.GetRequest{})
assert.Nil(t, resp)
_, isTimeoutError := err.(contextutil.TimeoutError)
require.True(t, isTimeoutError)
}

// TestIdleAndMaxTimeoutDisabled exercises the RequestBatcher when it is
Expand Down
33 changes: 13 additions & 20 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,11 @@ const (
// TODO(bdarnell): how to determine best value?
defaultTaskLimit = 1000

// intentResolverTimeout is the timeout when processing a group of intents.
// The timeout prevents intent resolution from getting stuck. Since
// processing intents is best effort, we'd rather give up than wait too long
// (this helps avoid deadlocks during test shutdown).
intentResolverTimeout = 30 * time.Second

// gcTimeout is the timeout when processing gc of a batch of txn records.
// Since processing txn records is best effort, we'd rather give up than
// wait too long (this helps avoid deadlocks during test shutdown).
gcTimeout = 30 * time.Second
// asyncIntentResolutionTimeout is the timeout when processing a group of
// intents asynchronously. The timeout prevents async intent resolution from
// getting stuck. Since processing intents is best effort, we'd rather give
// up than wait too long (this helps avoid deadlocks during test shutdown).
asyncIntentResolutionTimeout = 30 * time.Second

// intentResolverBatchSize is the maximum number of intents that will be
// resolved in a single batch. Batches that span many ranges (which is
Expand Down Expand Up @@ -204,7 +199,6 @@ func New(c Config) *IntentResolver {
MaxIdle: c.MaxGCBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
BatchTimeout: gcTimeout,
})
batchSize := intentResolverBatchSize
if c.TestingKnobs.MaxIntentResolutionBatchSize > 0 {
Expand All @@ -217,7 +211,6 @@ func New(c Config) *IntentResolver {
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
BatchTimeout: intentResolverTimeout,
})
return ir
}
Expand Down Expand Up @@ -510,10 +503,13 @@ func (ir *IntentResolver) CleanupIntentsAsync(
now := ir.clock.Now()
for _, item := range intents {
if err := ir.runAsyncTask(ctx, allowSyncProcessing, func(ctx context.Context) {
if _, err := ir.CleanupIntents(ctx, item.Intents, now, roachpb.PUSH_TOUCH); err != nil {
if ir.every.ShouldLog() {
log.Warning(ctx, err)
}
err := contextutil.RunWithTimeout(ctx, "async intent resolution",
asyncIntentResolutionTimeout, func(ctx context.Context) error {
_, err := ir.CleanupIntents(ctx, item.Intents, now, roachpb.PUSH_TOUCH)
return err
})
if err != nil && ir.every.ShouldLog() {
log.Warning(ctx, err)
}
}); err != nil {
return err
Expand Down Expand Up @@ -940,10 +936,7 @@ func (ir *IntentResolver) ResolveIntents(
b := &client.Batch{}
b.Header.MaxSpanRequestKeys = intentResolverBatchSize
b.AddRawRequest(req)
if err := contextutil.RunWithTimeout(ctx, "resolve span intents", intentResolverTimeout,
func(ctx context.Context) error {
return ir.db.Run(ctx, b)
}); err != nil {
if err := ir.db.Run(ctx, b); err != nil {
return err
}
// Check response to see if it must be resumed.
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/timeutil/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func Since(t time.Time) time.Duration {
return Now().Sub(t)
}

// Until returns the duration until t.
// It is shorthand for t.Sub(Now()).
func Until(t time.Time) time.Duration {
return t.Sub(Now())
}

// UnixEpoch represents the Unix epoch, January 1, 1970 UTC.
var UnixEpoch = time.Unix(0, 0).UTC()

Expand Down

0 comments on commit 96f73a3

Please sign in to comment.