Skip to content

Commit 9ca9450

Browse files
committed
kvcoord: prevent concurrent EndTxn requests
`TxnCoordSender` generally operates synchronously (i.e. the client waits for the previous response before sending the next request). However, the `txnHeartbeater` sends asynchronous `EndTxn(commit=false)` rollbacks when it discovers an aborted transaction record. Unfortunately, some code assumes synchrony, which caused race conditions with txn rollbacks. In particular, the `txnPipeliner` attaches lock spans and in-flight writes to the `EndTxn` request for e.g. intent cleanup, but it only records this information when it receives responses. Thus, if an `EndTxn(commit=false)` is sent concurrently with a write request, the lock spans and in-flight writes of that write request will not get attached to the `EndTxn` request and the intents will not get cleaned up. This patch makes the `txnHeartbeater` wait for any in-flight requests to complete before sending asynchronous rollbacks, and collapses incoming client rollbacks with in-flight async rollbacks. Release note (bug fix): Fixed a race condition where transaction cleanup would fail to take into account ongoing writes and clean up their intents.
1 parent 760f605 commit 9ca9450

4 files changed

+642
-105
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go

+142-18
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@ import (
1616
"time"
1717

1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
1920
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2021
"github.com/cockroachdb/cockroach/pkg/util/log"
2122
"github.com/cockroachdb/cockroach/pkg/util/stop"
2223
opentracing "github.com/opentracing/opentracing-go"
2324
)
2425

26+
// abortTxnAsyncTimeout is the context timeout for abortTxnAsyncLocked()
27+
// rollbacks. If the intent resolver has spare async task capacity, this timeout
28+
// only needs to be long enough for the EndTxn request to make it through Raft,
29+
// but if the cleanup task is synchronous (to backpressure clients) then cleanup
30+
// will be abandoned when the timeout expires. We generally want to clean up if
31+
// possible, but not at any cost, so we set it high at 1 minute.
32+
const abortTxnAsyncTimeout = time.Minute
33+
2534
// txnHeartbeater is a txnInterceptor in charge of a transaction's heartbeat
2635
// loop. Transaction coordinators heartbeat their transaction record
2736
// periodically to indicate the liveness of their transaction. Other actors like
@@ -104,9 +113,37 @@ type txnHeartbeater struct {
104113
// future requests sent though it (which indicates that the heartbeat
105114
// loop did not race with an EndTxn request).
106115
finalObservedStatus roachpb.TransactionStatus
116+
117+
// ifReqs tracks the number of in-flight requests. This is expected to
118+
// be either 0 or 1, but we let the txnLockGatekeeper enforce that.
119+
//
120+
// This is used to make sure we don't send EndTxn(commit=false) from
121+
// abortTxnAsyncLocked() concurrently with another in-flight request.
122+
// The TxnCoordSender assumes synchronous operation; in particular,
123+
// the txnPipeliner must update its lock spans with pending responses
124+
// before attaching the final lock spans to the EndTxn request.
125+
ifReqs uint8
126+
127+
// abortTxnAsyncPending, if true, signals that an abortTxnAsyncLocked()
128+
// call is waiting for in-flight requests to complete. Once the last
129+
// request returns (setting ifReqs=0), it calls abortTxnAsyncLocked().
130+
abortTxnAsyncPending bool
131+
132+
// abortTxnAsyncResultC is non-nil when an abortTxnAsyncLocked()
133+
// rollback is in-flight. If a client rollback arrives concurrently, it
134+
// will wait for the result on this channel, collapsing the requests to
135+
// prevent concurrent rollbacks. Only EndTxn(commit=false) requests can
136+
// arrive during rollback, the TxnCoordSender blocks any others due to
137+
// finalObservedStatus.
138+
abortTxnAsyncResultC chan abortTxnAsyncResult
107139
}
108140
}
109141

142+
type abortTxnAsyncResult struct {
143+
br *roachpb.BatchResponse
144+
pErr *roachpb.Error
145+
}
146+
110147
// init initializes the txnHeartbeater. This method exists instead of a
111148
// constructor because txnHeartbeaters live in a pool in the TxnCoordSender.
112149
func (h *txnHeartbeater) init(
@@ -165,10 +202,46 @@ func (h *txnHeartbeater) SendLocked(
165202
if hasET {
166203
et := etArg.(*roachpb.EndTxnRequest)
167204
et.TxnHeartbeating = h.mu.loopStarted
205+
206+
if !et.Commit {
207+
// If an abortTxnAsyncLocked() rollback is in flight, we'll wait for
208+
// its result here to avoid sending a concurrent rollback.
209+
// Otherwise, txnLockGatekeeper would error since it does not allow
210+
// concurrent requests (to enforce a synchronous client protocol).
211+
if resultC := h.mu.abortTxnAsyncResultC; resultC != nil {
212+
// We have to unlock the mutex while waiting, to allow the
213+
// txnLockGatekeeper to acquire the mutex when receiving the
214+
// async abort response. Once we receive our copy of the
215+
// response, we re-acquire the lock to return it to the client.
216+
h.mu.Unlock()
217+
defer h.mu.Lock()
218+
select {
219+
case res := <-resultC:
220+
return res.br, res.pErr
221+
case <-ctx.Done():
222+
return nil, roachpb.NewError(ctx.Err())
223+
}
224+
}
225+
}
168226
}
169227

170-
// Forward the batch through the wrapped lockedSender.
171-
return h.wrapped.SendLocked(ctx, ba)
228+
// Forward the batch through the wrapped lockedSender, recording the
229+
// in-flight request to coordinate with abortTxnAsyncLocked(). Recall that
230+
// the mutex is unlocked for the duration of the SendLocked() call.
231+
h.mu.ifReqs++
232+
br, pErr := h.wrapped.SendLocked(ctx, ba)
233+
h.mu.ifReqs--
234+
235+
// If an abortTxnAsyncLocked() call is waiting for this in-flight
236+
// request to complete, call it. At this point, finalObservedStatus has
237+
// already been set, so we don't have to worry about additional incoming
238+
// requests (except rollbacks) -- the TxnCoordSender will block them.
239+
if h.mu.abortTxnAsyncPending && h.mu.ifReqs == 0 {
240+
h.abortTxnAsyncLocked(ctx)
241+
h.mu.abortTxnAsyncPending = false
242+
}
243+
244+
return br, pErr
172245
}
173246

174247
// setWrapped is part of the txnInterceptor interface.
@@ -321,7 +394,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
321394

322395
var respTxn *roachpb.Transaction
323396
if pErr != nil {
324-
log.VEventf(ctx, 2, "heartbeat failed: %s", pErr)
397+
log.VEventf(ctx, 2, "heartbeat failed for %s: %s", h.mu.txn, pErr)
325398

326399
// We need to be prepared here to handle the case of a
327400
// TransactionAbortedError with no transaction proto in it.
@@ -337,6 +410,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
337410
// its commit or an ambiguous one and we have nothing to offer that
338411
// provides more clarity. We do however prevent it from running more
339412
// requests in case it isn't aware that the transaction is over.
413+
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, cleaning up for %s", h.mu.txn)
340414
h.abortTxnAsyncLocked(ctx)
341415
h.mu.finalObservedStatus = roachpb.ABORTED
342416
return false
@@ -356,6 +430,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
356430
case roachpb.ABORTED:
357431
// Roll back the transaction record to clean up intents and
358432
// then shut down the heartbeat loop.
433+
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, cleaning up for %s", h.mu.txn)
359434
h.abortTxnAsyncLocked(ctx)
360435
}
361436
h.mu.finalObservedStatus = respTxn.Status
@@ -364,15 +439,19 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
364439
return true
365440
}
366441

367-
// abortTxnAsyncLocked send an EndTxn(commmit=false) asynchronously.
442+
// abortTxnAsyncLocked sends an EndTxn(commmit=false) asynchronously.
368443
// The purpose of the async cleanup is to resolve transaction intents as soon
369444
// as possible when a transaction coordinator observes an ABORTED transaction.
370445
func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
371-
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
372446

373-
// NB: We use context.Background() here because we don't want a canceled
374-
// context to interrupt the aborting.
375-
ctx = h.AnnotateCtx(context.Background())
447+
// If a request is in flight, we must wait for it to complete first such
448+
// that txnPipeliner can record its lock spans and attach them to the EndTxn
449+
// request we'll send.
450+
if h.mu.ifReqs > 0 {
451+
h.mu.abortTxnAsyncPending = true
452+
log.VEventf(ctx, 2, "async abort waiting for in-flight request for txn %s", h.mu.txn)
453+
return
454+
}
376455

377456
// Construct a batch with an EndTxn request.
378457
txn := h.mu.txn.Clone()
@@ -386,17 +465,62 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
386465
TxnHeartbeating: true,
387466
})
388467

468+
const taskName = "txnHeartbeater: aborting txn"
389469
log.VEventf(ctx, 2, "async abort for txn: %s", txn)
390-
if err := h.stopper.RunAsyncTask(
391-
ctx, "txnHeartbeater: aborting txn", func(ctx context.Context) {
392-
// Send the abort request through the interceptor stack. This is
393-
// important because we need the txnPipeliner to append lock spans
394-
// to the EndTxn request.
395-
h.mu.Lock()
396-
defer h.mu.Unlock()
397-
_, pErr := h.wrapped.SendLocked(ctx, ba)
398-
if pErr != nil {
399-
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
470+
if err := h.stopper.RunAsyncTask(h.AnnotateCtx(context.Background()), taskName,
471+
func(ctx context.Context) {
472+
if err := contextutil.RunWithTimeout(ctx, taskName, abortTxnAsyncTimeout,
473+
func(ctx context.Context) error {
474+
h.mu.Lock()
475+
defer h.mu.Unlock()
476+
477+
// If we find an abortTxnAsyncResultC, that means an async
478+
// rollback request is already in flight, so there's no
479+
// point in us running another. This can happen because the
480+
// TxnCoordSender also calls abortTxnAsyncLocked()
481+
// independently of the heartbeat loop.
482+
if h.mu.abortTxnAsyncResultC != nil {
483+
log.VEventf(ctx, 2,
484+
"skipping async abort due to concurrent async abort for %s", txn)
485+
return nil
486+
}
487+
488+
// TxnCoordSender allows EndTxn(commit=false) through even
489+
// after we set finalObservedStatus, and that request can
490+
// race with us for the mutex. Thus, if we find an in-flight
491+
// request here, after checking ifReqs=0 before being spawned,
492+
// we deduce that it must have been a rollback and there's no
493+
// point in sending another rollback.
494+
if h.mu.ifReqs > 0 {
495+
log.VEventf(ctx, 2,
496+
"skipping async abort due to client rollback for %s", txn)
497+
return nil
498+
}
499+
500+
// Set up a result channel to signal to an incoming client
501+
// rollback that an async rollback is already in progress,
502+
// and pass it the result. The buffer allows storing the
503+
// result even when no client rollback arrives. Recall that
504+
// the SendLocked() call below releases the mutex while
505+
// running, allowing concurrent incoming requests.
506+
h.mu.abortTxnAsyncResultC = make(chan abortTxnAsyncResult, 1)
507+
508+
// Send the abort request through the interceptor stack. This is
509+
// important because we need the txnPipeliner to append lock spans
510+
// to the EndTxn request.
511+
br, pErr := h.wrapped.SendLocked(ctx, ba)
512+
if pErr != nil {
513+
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
514+
}
515+
516+
// Pass the result to a waiting client rollback, if any, and
517+
// remove the channel since we're no longer in flight.
518+
h.mu.abortTxnAsyncResultC <- abortTxnAsyncResult{br: br, pErr: pErr}
519+
h.mu.abortTxnAsyncResultC = nil
520+
return nil
521+
},
522+
); err != nil {
523+
log.VEventf(ctx, 1, "async abort failed for %s: %s", txn, err)
400524
}
401525
},
402526
); err != nil {

0 commit comments

Comments
 (0)