Skip to content

Commit 909c46c

Browse files
committed
kvcoord: prevent concurrent EndTxn requests
`TxnCoordSender` generally operates synchronously (i.e. the client waits for the previous response before sending the next request). However, the `txnHeartbeater` sends asynchronous `EndTxn(commit=false)` rollbacks when it discovers an aborted transaction record. Unfortunately, some code assumes synchrony, which caused race conditions with txn rollbacks. In particular, the `txnPipeliner` attaches lock spans and in-flight writes to the `EndTxn` request for e.g. intent cleanup, but it only records this information when it receives responses. Thus, if an `EndTxn(commit=false)` is sent concurrently with a write request, the lock spans and in-flight writes of that write request will not get attached to the `EndTxn` request and the intents will not get cleaned up. This patch makes the `txnHeartbeater` wait for any in-flight requests to complete before sending asynchronous rollbacks, and collapses incoming client rollbacks with in-flight async rollbacks. Release note (bug fix): Fixed a race condition where transaction cleanup would fail to take into account ongoing writes and clean up their intents.
1 parent 6cd0067 commit 909c46c

5 files changed

+601
-106
lines changed

pkg/kv/kvclient/kvcoord/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ go_library(
4747
"//pkg/settings/cluster",
4848
"//pkg/storage/enginepb",
4949
"//pkg/util",
50+
"//pkg/util/contextutil",
5051
"//pkg/util/ctxgroup",
5152
"//pkg/util/errorutil/unimplemented",
5253
"//pkg/util/grpcutil",

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go

+140-18
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@ import (
1616
"time"
1717

1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
1920
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2021
"github.com/cockroachdb/cockroach/pkg/util/log"
2122
"github.com/cockroachdb/cockroach/pkg/util/stop"
2223
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2324
)
2425

26+
// abortTxnAsyncTimeout is the context timeout for abortTxnAsyncLocked()
27+
// rollbacks. If the intent resolver has spare async task capacity, this timeout
28+
// only needs to be long enough for the EndTxn request to make it through Raft,
29+
// but if the cleanup task is synchronous (to backpressure clients) then cleanup
30+
// will be abandoned when the timeout expires. We generally want to clean up if
31+
// possible, but not at any cost, so we set it high at 1 minute.
32+
const abortTxnAsyncTimeout = time.Minute
33+
2534
// txnHeartbeater is a txnInterceptor in charge of a transaction's heartbeat
2635
// loop. Transaction coordinators heartbeat their transaction record
2736
// periodically to indicate the liveness of their transaction. Other actors like
@@ -104,9 +113,37 @@ type txnHeartbeater struct {
104113
// future requests sent though it (which indicates that the heartbeat
105114
// loop did not race with an EndTxn request).
106115
finalObservedStatus roachpb.TransactionStatus
116+
117+
// ifReqs tracks the number of in-flight requests. This is expected to
118+
// be either 0 or 1, but we let the txnLockGatekeeper enforce that.
119+
//
120+
// This is used to make sure we don't send EndTxn(commit=false) from
121+
// abortTxnAsyncLocked() concurrently with another in-flight request.
122+
// The TxnCoordSender assumes synchronous operation; in particular,
123+
// the txnPipeliner must update its lock spans with pending responses
124+
// before attaching the final lock spans to the EndTxn request.
125+
ifReqs uint8
126+
127+
// abortTxnAsyncPending, if true, signals that an abortTxnAsyncLocked()
128+
// call is waiting for in-flight requests to complete. Once the last
129+
// request returns (setting ifReqs=0), it calls abortTxnAsyncLocked().
130+
abortTxnAsyncPending bool
131+
132+
// abortTxnAsyncResultC is non-nil when an abortTxnAsyncLocked()
133+
// rollback is in-flight. If a client rollback arrives concurrently, it
134+
// will wait for the result on this channel, collapsing the requests to
135+
// prevent concurrent rollbacks. Only EndTxn(commit=false) requests can
136+
// arrive during rollback, the TxnCoordSender blocks any others due to
137+
// finalObservedStatus.
138+
abortTxnAsyncResultC chan abortTxnAsyncResult
107139
}
108140
}
109141

142+
type abortTxnAsyncResult struct {
143+
br *roachpb.BatchResponse
144+
pErr *roachpb.Error
145+
}
146+
110147
// init initializes the txnHeartbeater. This method exists instead of a
111148
// constructor because txnHeartbeaters live in a pool in the TxnCoordSender.
112149
func (h *txnHeartbeater) init(
@@ -175,11 +212,45 @@ func (h *txnHeartbeater) SendLocked(
175212
// with the next epoch.
176213
if !et.Commit {
177214
h.cancelHeartbeatLoopLocked()
215+
216+
// If an abortTxnAsyncLocked() rollback is in flight, we'll wait for
217+
// its result here to avoid sending a concurrent rollback.
218+
// Otherwise, txnLockGatekeeper would error since it does not allow
219+
// concurrent requests (to enforce a synchronous client protocol).
220+
if resultC := h.mu.abortTxnAsyncResultC; resultC != nil {
221+
// We have to unlock the mutex while waiting, to allow the
222+
// txnLockGatekeeper to acquire the mutex when receiving the
223+
// async abort response. Once we receive our copy of the
224+
// response, we re-acquire the lock to return it to the client.
225+
h.mu.Unlock()
226+
defer h.mu.Lock()
227+
select {
228+
case res := <-resultC:
229+
return res.br, res.pErr
230+
case <-ctx.Done():
231+
return nil, roachpb.NewError(ctx.Err())
232+
}
233+
}
178234
}
179235
}
180236

181-
// Forward the batch through the wrapped lockedSender.
182-
return h.wrapped.SendLocked(ctx, ba)
237+
// Forward the batch through the wrapped lockedSender, recording the
238+
// in-flight request to coordinate with abortTxnAsyncLocked(). Recall that
239+
// the mutex is unlocked for the duration of the SendLocked() call.
240+
h.mu.ifReqs++
241+
br, pErr := h.wrapped.SendLocked(ctx, ba)
242+
h.mu.ifReqs--
243+
244+
// If an abortTxnAsyncLocked() call is waiting for this in-flight
245+
// request to complete, call it. At this point, finalObservedStatus has
246+
// already been set, so we don't have to worry about additional incoming
247+
// requests (except rollbacks) -- the TxnCoordSender will block them.
248+
if h.mu.abortTxnAsyncPending && h.mu.ifReqs == 0 {
249+
h.abortTxnAsyncLocked(ctx)
250+
h.mu.abortTxnAsyncPending = false
251+
}
252+
253+
return br, pErr
183254
}
184255

185256
// setWrapped is part of the txnInterceptor interface.
@@ -332,7 +403,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
332403

333404
var respTxn *roachpb.Transaction
334405
if pErr != nil {
335-
log.VEventf(ctx, 2, "heartbeat failed: %s", pErr)
406+
log.VEventf(ctx, 2, "heartbeat failed for %s: %s", h.mu.txn, pErr)
336407

337408
// We need to be prepared here to handle the case of a
338409
// TransactionAbortedError with no transaction proto in it.
@@ -348,6 +419,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
348419
// its commit or an ambiguous one and we have nothing to offer that
349420
// provides more clarity. We do however prevent it from running more
350421
// requests in case it isn't aware that the transaction is over.
422+
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, cleaning up for %s", h.mu.txn)
351423
h.abortTxnAsyncLocked(ctx)
352424
h.mu.finalObservedStatus = roachpb.ABORTED
353425
return false
@@ -367,6 +439,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
367439
case roachpb.ABORTED:
368440
// Roll back the transaction record to clean up intents and
369441
// then shut down the heartbeat loop.
442+
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, cleaning up for %s", h.mu.txn)
370443
h.abortTxnAsyncLocked(ctx)
371444
}
372445
h.mu.finalObservedStatus = respTxn.Status
@@ -375,15 +448,19 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
375448
return true
376449
}
377450

378-
// abortTxnAsyncLocked send an EndTxn(commmit=false) asynchronously.
451+
// abortTxnAsyncLocked sends an EndTxn(commmit=false) asynchronously.
379452
// The purpose of the async cleanup is to resolve transaction intents as soon
380453
// as possible when a transaction coordinator observes an ABORTED transaction.
381454
func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
382-
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
383455

384-
// NB: We use context.Background() here because we don't want a canceled
385-
// context to interrupt the aborting.
386-
ctx = h.AnnotateCtx(context.Background())
456+
// If a request is in flight, we must wait for it to complete first such
457+
// that txnPipeliner can record its lock spans and attach them to the EndTxn
458+
// request we'll send.
459+
if h.mu.ifReqs > 0 {
460+
h.mu.abortTxnAsyncPending = true
461+
log.VEventf(ctx, 2, "async abort waiting for in-flight request for txn %s", h.mu.txn)
462+
return
463+
}
387464

388465
// Construct a batch with an EndTxn request.
389466
txn := h.mu.txn.Clone()
@@ -397,17 +474,62 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
397474
TxnHeartbeating: true,
398475
})
399476

477+
const taskName = "txnHeartbeater: aborting txn"
400478
log.VEventf(ctx, 2, "async abort for txn: %s", txn)
401-
if err := h.stopper.RunAsyncTask(
402-
ctx, "txnHeartbeater: aborting txn", func(ctx context.Context) {
403-
// Send the abort request through the interceptor stack. This is
404-
// important because we need the txnPipeliner to append lock spans
405-
// to the EndTxn request.
406-
h.mu.Lock()
407-
defer h.mu.Unlock()
408-
_, pErr := h.wrapped.SendLocked(ctx, ba)
409-
if pErr != nil {
410-
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
479+
if err := h.stopper.RunAsyncTask(h.AnnotateCtx(context.Background()), taskName,
480+
func(ctx context.Context) {
481+
if err := contextutil.RunWithTimeout(ctx, taskName, abortTxnAsyncTimeout,
482+
func(ctx context.Context) error {
483+
h.mu.Lock()
484+
defer h.mu.Unlock()
485+
486+
// If we find an abortTxnAsyncResultC, that means an async
487+
// rollback request is already in flight, so there's no
488+
// point in us running another. This can happen because the
489+
// TxnCoordSender also calls abortTxnAsyncLocked()
490+
// independently of the heartbeat loop.
491+
if h.mu.abortTxnAsyncResultC != nil {
492+
log.VEventf(ctx, 2,
493+
"skipping async abort due to concurrent async abort for %s", txn)
494+
return nil
495+
}
496+
497+
// TxnCoordSender allows EndTxn(commit=false) through even
498+
// after we set finalObservedStatus, and that request can
499+
// race with us for the mutex. Thus, if we find an in-flight
500+
// request here, after checking ifReqs=0 before being spawned,
501+
// we deduce that it must have been a rollback and there's no
502+
// point in sending another rollback.
503+
if h.mu.ifReqs > 0 {
504+
log.VEventf(ctx, 2,
505+
"skipping async abort due to client rollback for %s", txn)
506+
return nil
507+
}
508+
509+
// Set up a result channel to signal to an incoming client
510+
// rollback that an async rollback is already in progress,
511+
// and pass it the result. The buffer allows storing the
512+
// result even when no client rollback arrives. Recall that
513+
// the SendLocked() call below releases the mutex while
514+
// running, allowing concurrent incoming requests.
515+
h.mu.abortTxnAsyncResultC = make(chan abortTxnAsyncResult, 1)
516+
517+
// Send the abort request through the interceptor stack. This is
518+
// important because we need the txnPipeliner to append lock spans
519+
// to the EndTxn request.
520+
br, pErr := h.wrapped.SendLocked(ctx, ba)
521+
if pErr != nil {
522+
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
523+
}
524+
525+
// Pass the result to a waiting client rollback, if any, and
526+
// remove the channel since we're no longer in flight.
527+
h.mu.abortTxnAsyncResultC <- abortTxnAsyncResult{br: br, pErr: pErr}
528+
h.mu.abortTxnAsyncResultC = nil
529+
return nil
530+
},
531+
); err != nil {
532+
log.VEventf(ctx, 1, "async abort failed for %s: %s", txn, err)
411533
}
412534
},
413535
); err != nil {

0 commit comments

Comments
 (0)