Skip to content

Commit e5993e4

Browse files
committed
kvcoord: prevent concurrent EndTxn requests
`TxnCoordSender` generally operates synchronously (i.e. the client waits for the previous response before sending the next request). However, the `txnHeartbeater` sends asynchronous `EndTxn(commit=false)` rollbacks when it discovers an aborted transaction record. Unfortunately, some code assumes synchrony, which caused race conditions with txn rollbacks. In particular, the `txnPipeliner` attaches lock spans and in-flight writes to the `EndTxn` request for e.g. intent cleanup, but it only records this information when it receives responses. Thus, if an `EndTxn(commit=false)` is sent concurrently with a write request, the lock spans and in-flight writes of that write request will not get attached to the `EndTxn` request and the intents will not get cleaned up. This patch makes the `txnHeartbeater` wait for any in-flight requests to complete before sending asynchronous rollbacks. Release note (bug fix): Fixed a race condition where transaction cleanup would fail to take into account ongoing writes and clean up their intents.
1 parent 9e58831 commit e5993e4

5 files changed

+573
-101
lines changed

pkg/kv/kvclient/kvcoord/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ go_library(
4747
"//pkg/settings/cluster",
4848
"//pkg/storage/enginepb",
4949
"//pkg/util",
50+
"//pkg/util/contextutil",
5051
"//pkg/util/ctxgroup",
5152
"//pkg/util/errorutil/unimplemented",
5253
"//pkg/util/grpcutil",

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go

+113-14
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@ import (
1616
"time"
1717

1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
1920
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2021
"github.com/cockroachdb/cockroach/pkg/util/log"
2122
"github.com/cockroachdb/cockroach/pkg/util/stop"
2223
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2324
)
2425

26+
// abortTxnAsyncTimeout is the context timeout for abortTxnAsyncLocked()
27+
// rollbacks. If the intent resolver has spare async task capacity, this timeout
28+
// only needs to be long enough for the EndTxn request to make it through Raft,
29+
// but if the cleanup task is synchronous (to backpressure clients) then cleanup
30+
// will be abandoned when the timeout expires. We generally want to clean up if
31+
// possible, but not at any cost, so we set it high at 1 minute.
32+
const abortTxnAsyncTimeout = time.Minute
33+
2534
// txnHeartbeater is a txnInterceptor in charge of a transaction's heartbeat
2635
// loop. Transaction coordinators heartbeat their transaction record
2736
// periodically to indicate the liveness of their transaction. Other actors like
@@ -104,9 +113,37 @@ type txnHeartbeater struct {
104113
// future requests sent though it (which indicates that the heartbeat
105114
// loop did not race with an EndTxn request).
106115
finalObservedStatus roachpb.TransactionStatus
116+
117+
// ifReqs tracks the number of in-flight requests. This is expected to
118+
// be either 0 or 1, but we let the txnLockGatekeeper enforce that.
119+
//
120+
// This is used to make sure we don't send EndTxn(commit=false) from
121+
// abortTxnAsyncLocked() concurrently with another in-flight request.
122+
// The TxnCoordSender assumes synchronous operation; in particular,
123+
// the txnPipeliner must update its lock spans with pending responses
124+
// before attaching the final lock spans to the EndTxn request.
125+
ifReqs uint8
126+
127+
// abortTxnAsyncPending, if true, signals that an abortTxnAsyncLocked()
128+
// call is waiting for in-flight requests to complete. Once the last
129+
// request returns (setting ifReqs=0), it calls abortTxnAsyncLocked().
130+
abortTxnAsyncPending bool
131+
132+
// abortTxnAsyncResultC is non-nil when an abortTxnAsyncLocked()
133+
// rollback is in-flight. If a client rollback arrives concurrently, it
134+
// will wait for the result on this channel, collapsing the requests.
135+
// This prevents violating the txnLockGatekeeper concurrency assertion.
136+
// Only EndTxn(commit=false) requests can arrive during rollback, the
137+
// TxnCoordSender blocks any others due to finalObservedStatus.
138+
abortTxnAsyncResultC chan abortTxnAsyncResult
107139
}
108140
}
109141

142+
type abortTxnAsyncResult struct {
143+
br *roachpb.BatchResponse
144+
pErr *roachpb.Error
145+
}
146+
110147
// init initializes the txnHeartbeater. This method exists instead of a
111148
// constructor because txnHeartbeaters live in a pool in the TxnCoordSender.
112149
func (h *txnHeartbeater) init(
@@ -175,9 +212,38 @@ func (h *txnHeartbeater) SendLocked(
175212
// with the next epoch.
176213
if !et.Commit {
177214
h.cancelHeartbeatLoopLocked()
215+
216+
// If an abortTxnAsyncLocked() rollback is in flight, we'll wait
217+
// for the result here to avoid violating the txnLockGatekeeper
218+
// concurrency assertion.
219+
if resultC := h.mu.abortTxnAsyncResultC; resultC != nil {
220+
h.mu.Unlock()
221+
defer h.mu.Lock()
222+
select {
223+
case res := <-resultC:
224+
return res.br, res.pErr
225+
case <-ctx.Done():
226+
return nil, roachpb.NewError(ctx.Err())
227+
}
228+
}
178229
}
179230
}
180231

232+
// Record the in-flight request and its response.
233+
h.mu.ifReqs++
234+
defer func() {
235+
h.mu.ifReqs--
236+
237+
// If an abortTxnAsyncLocked() call is waiting for this in-flight
238+
// request to complete, call it. At this point, finalObservedStatus has
239+
// already been set, so we don't have to worry about additional incoming
240+
// requests (except rollbacks), the TxnCoordSender will block them.
241+
if h.mu.abortTxnAsyncPending && h.mu.ifReqs == 0 {
242+
h.abortTxnAsyncLocked(ctx)
243+
h.mu.abortTxnAsyncPending = false
244+
}
245+
}()
246+
181247
// Forward the batch through the wrapped lockedSender.
182248
return h.wrapped.SendLocked(ctx, ba)
183249
}
@@ -379,11 +445,26 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
379445
// The purpose of the async cleanup is to resolve transaction intents as soon
380446
// as possible when a transaction coordinator observes an ABORTED transaction.
381447
func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
382-
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
383448

384-
// NB: We use context.Background() here because we don't want a canceled
385-
// context to interrupt the aborting.
386-
ctx = h.AnnotateCtx(context.Background())
449+
// If a request is in flight, we must wait for it to complete first such
450+
// that txnPipeliner can record its lock spans and attach them to the EndTxn
451+
// request we'll send.
452+
if h.mu.ifReqs > 0 {
453+
h.mu.abortTxnAsyncPending = true
454+
log.VEventf(ctx, 1,
455+
"Heartbeat detected aborted txn. Waiting for in-flight request before cleaning up.")
456+
return
457+
}
458+
459+
// If another abortTxnAsyncLocked attempt is already in flight, bail out.
460+
// We check this since the TxnCoordSender also calls abortTxnAsyncLocked()
461+
// independently of the heartbeat loop.
462+
if h.mu.abortTxnAsyncResultC != nil {
463+
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, but found concurrent abort attempt.")
464+
return
465+
}
466+
467+
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
387468

388469
// Construct a batch with an EndTxn request.
389470
txn := h.mu.txn.Clone()
@@ -397,21 +478,39 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
397478
TxnHeartbeating: true,
398479
})
399480

481+
// Set up a result channel to signal to an incoming client rollback that
482+
// an async rollback is already in progress and pass it the result. The
483+
// buffer allows storing the result even when no client rollback arrives.
484+
h.mu.abortTxnAsyncResultC = make(chan abortTxnAsyncResult, 1)
485+
400486
log.VEventf(ctx, 2, "async abort for txn: %s", txn)
401-
if err := h.stopper.RunAsyncTask(
402-
ctx, "txnHeartbeater: aborting txn", func(ctx context.Context) {
403-
// Send the abort request through the interceptor stack. This is
404-
// important because we need the txnPipeliner to append lock spans
405-
// to the EndTxn request.
406-
h.mu.Lock()
407-
defer h.mu.Unlock()
408-
_, pErr := h.wrapped.SendLocked(ctx, ba)
409-
if pErr != nil {
410-
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
487+
const taskName = "txnHeartbeater: aborting txn"
488+
if err := h.stopper.RunAsyncTask(h.AnnotateCtx(context.Background()), taskName,
489+
func(ctx context.Context) {
490+
if err := contextutil.RunWithTimeout(ctx, taskName, abortTxnAsyncTimeout,
491+
func(ctx context.Context) error {
492+
// Send the abort request through the interceptor stack. This is
493+
// important because we need the txnPipeliner to append lock spans
494+
// to the EndTxn request.
495+
h.mu.Lock()
496+
defer h.mu.Unlock()
497+
br, pErr := h.wrapped.SendLocked(ctx, ba)
498+
if pErr != nil {
499+
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
500+
}
501+
// Pass the result to a waiting client rollback, if any, and
502+
// remove the channel since we're no longer in flight.
503+
h.mu.abortTxnAsyncResultC <- abortTxnAsyncResult{br: br, pErr: pErr}
504+
h.mu.abortTxnAsyncResultC = nil
505+
return nil
506+
},
507+
); err != nil {
508+
log.VEventf(ctx, 1, "async abort failed for %s: %s", txn, err)
411509
}
412510
},
413511
); err != nil {
414512
log.Warningf(ctx, "%v", err)
513+
h.mu.abortTxnAsyncResultC = nil // task wasn't started after all
415514
}
416515
}
417516

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go

+197
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,203 @@ func TestTxnHeartbeaterAsyncAbort(t *testing.T) {
417417
})
418418
}
419419

420+
// TestTxnHeartbeaterAsyncAbortWaitsForInFlight tests that the txnHeartbeater
421+
// will wait for an in-flight request to complete before sending the
422+
// EndTxn rollback request.
423+
func TestTxnHeartbeaterAsyncAbortWaitsForInFlight(t *testing.T) {
424+
defer leaktest.AfterTest(t)()
425+
defer log.Scope(t).Close(t)
426+
ctx := context.Background()
427+
txn := makeTxnProto()
428+
th, mockSender, mockGatekeeper := makeMockTxnHeartbeater(&txn)
429+
defer th.stopper.Stop(ctx)
430+
431+
// Mock the heartbeat request, which should wait for an in-flight put via
432+
// putReady then return an aborted txn and signal hbAborted.
433+
putReady := make(chan struct{})
434+
hbAborted := make(chan struct{})
435+
mockGatekeeper.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
436+
<-putReady
437+
defer close(hbAborted)
438+
439+
require.Len(t, ba.Requests, 1)
440+
require.IsType(t, &roachpb.HeartbeatTxnRequest{}, ba.Requests[0].GetInner())
441+
442+
br := ba.CreateReply()
443+
br.Txn = ba.Txn
444+
br.Txn.Status = roachpb.ABORTED
445+
return br, nil
446+
})
447+
448+
putResume := make(chan struct{})
449+
rollbackSent := make(chan struct{})
450+
mockSender.ChainMockSend(
451+
// Mock a Put, which signals putReady and then waits for putResume
452+
// before returning a response.
453+
func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
454+
th.mu.Unlock() // without txnLockGatekeeper, we must unlock manually
455+
defer th.mu.Lock()
456+
close(putReady)
457+
require.Len(t, ba.Requests, 1)
458+
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
459+
460+
<-putResume
461+
462+
br := ba.CreateReply()
463+
br.Txn = ba.Txn
464+
return br, nil
465+
},
466+
// Mock an EndTxn, which signals rollbackSent.
467+
func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
468+
defer close(rollbackSent)
469+
require.Len(t, ba.Requests, 1)
470+
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
471+
472+
etReq := ba.Requests[0].GetInner().(*roachpb.EndTxnRequest)
473+
require.Equal(t, &txn, ba.Txn)
474+
require.False(t, etReq.Commit)
475+
require.True(t, etReq.Poison)
476+
require.True(t, etReq.TxnHeartbeating)
477+
478+
br := ba.CreateReply()
479+
br.Txn = ba.Txn
480+
br.Txn.Status = roachpb.ABORTED
481+
return br, nil
482+
},
483+
)
484+
485+
// Spawn a goroutine to send the Put.
486+
require.NoError(t, th.stopper.RunAsyncTask(ctx, "put", func(ctx context.Context) {
487+
var ba roachpb.BatchRequest
488+
ba.Header = roachpb.Header{Txn: txn.Clone()}
489+
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a")}})
490+
491+
th.mu.Lock() // without TxnCoordSender, we must lock manually
492+
defer th.mu.Unlock()
493+
br, pErr := th.SendLocked(ctx, ba)
494+
require.Nil(t, pErr)
495+
require.NotNil(t, br)
496+
}))
497+
498+
<-putReady // wait for put
499+
<-hbAborted // wait for heartbeat abort
500+
select {
501+
case <-rollbackSent: // we don't expect a rollback yet
502+
require.Fail(t, "received unexpected EndTxn")
503+
case <-time.After(20 * time.Millisecond):
504+
}
505+
close(putResume) // make put return
506+
<-rollbackSent // we now expect the rollback
507+
508+
// The heartbeat loop should eventually close.
509+
waitForHeartbeatLoopToStop(t, &th)
510+
}
511+
512+
// TestTxnHeartbeaterAsyncAbortCollapsesRequests tests that when the
513+
// txnHeartbeater has an async abort rollback in flight, any client
514+
// rollbacks will wait for the async rollback to complete and return
515+
// its result.
516+
func TestTxnHeartbeaterAsyncAbortCollapsesRequests(t *testing.T) {
517+
defer leaktest.AfterTest(t)()
518+
defer log.Scope(t).Close(t)
519+
ctx := context.Background()
520+
txn := makeTxnProto()
521+
th, mockSender, mockGatekeeper := makeMockTxnHeartbeater(&txn)
522+
defer th.stopper.Stop(ctx)
523+
524+
// Mock the heartbeat request, which simply aborts and signals hbAborted.
525+
hbAborted := make(chan struct{})
526+
mockGatekeeper.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
527+
defer close(hbAborted)
528+
529+
require.Len(t, ba.Requests, 1)
530+
require.IsType(t, &roachpb.HeartbeatTxnRequest{}, ba.Requests[0].GetInner())
531+
532+
br := ba.CreateReply()
533+
br.Txn = ba.Txn
534+
br.Txn.Status = roachpb.ABORTED
535+
return br, nil
536+
})
537+
538+
// Mock an EndTxn response, which signals rollbackReady and blocks
539+
// until rollbackUnblock is closed.
540+
rollbackReady := make(chan struct{})
541+
rollbackUnblock := make(chan struct{})
542+
mockSender.ChainMockSend(
543+
// The first Put request is expected and should just return.
544+
func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
545+
require.Len(t, ba.Requests, 1)
546+
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
547+
548+
br := ba.CreateReply()
549+
br.Txn = ba.Txn
550+
return br, nil
551+
},
552+
// The first EndTxn request from the heartbeater is expected, so block and return.
553+
func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
554+
th.mu.Unlock() // manually unlock for concurrency, no txnLockGatekeeper
555+
defer th.mu.Lock()
556+
close(rollbackReady)
557+
require.Len(t, ba.Requests, 1)
558+
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
559+
560+
<-rollbackUnblock
561+
562+
etReq := ba.Requests[0].GetInner().(*roachpb.EndTxnRequest)
563+
require.Equal(t, &txn, ba.Txn)
564+
require.False(t, etReq.Commit)
565+
require.True(t, etReq.Poison)
566+
require.True(t, etReq.TxnHeartbeating)
567+
568+
br := ba.CreateReply()
569+
br.Txn = ba.Txn
570+
br.Txn.Status = roachpb.ABORTED
571+
return br, nil
572+
},
573+
// The second EndTxn request from the client is unexpected, so
574+
// return an error response.
575+
func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
576+
return nil, roachpb.NewError(errors.Errorf("unexpected request: %v", ba))
577+
},
578+
)
579+
580+
// Kick off the heartbeat loop.
581+
var ba roachpb.BatchRequest
582+
ba.Header = roachpb.Header{Txn: txn.Clone()}
583+
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a")}})
584+
585+
th.mu.Lock() // manually lock, there's no TxnCoordSender
586+
br, pErr := th.SendLocked(ctx, ba)
587+
th.mu.Unlock()
588+
require.Nil(t, pErr)
589+
require.NotNil(t, br)
590+
591+
// Wait for the heartbeater to abort and send an EndTxn.
592+
<-hbAborted
593+
<-rollbackReady
594+
595+
// Send a rollback from the client. This should be collapsed together
596+
// with the heartbeat abort, and block until it returns. We spawn
597+
// a goroutine to unblock the rollback.
598+
require.NoError(t, th.stopper.RunAsyncTask(ctx, "put", func(ctx context.Context) {
599+
time.Sleep(100 * time.Millisecond)
600+
close(rollbackUnblock)
601+
}))
602+
603+
ba = roachpb.BatchRequest{}
604+
ba.Header = roachpb.Header{Txn: txn.Clone()}
605+
ba.Add(&roachpb.EndTxnRequest{Commit: false})
606+
607+
th.mu.Lock() // manually lock, there's no TxnCoordSender
608+
br, pErr = th.SendLocked(ctx, ba)
609+
th.mu.Unlock()
610+
require.Nil(t, pErr)
611+
require.NotNil(t, br)
612+
613+
// The heartbeat loop should eventually close.
614+
waitForHeartbeatLoopToStop(t, &th)
615+
}
616+
420617
func heartbeaterRunning(th *txnHeartbeater) bool {
421618
th.mu.Lock()
422619
defer th.mu.Unlock()

0 commit comments

Comments
 (0)