Skip to content

Commit 98b53dd

Browse files
committed
kvcoord: fix EndTxn elision race with in-flight requests
`TxnCoordSender` has a fast-path that avoids sending `EndTxn` requests if the transaction has not taken out any locks. However, it relied on `txnPipeliner` to keep track of locks, and it only registers these when responses arrive. Since `EndTxn(commit=false)` requests can be sent asynchronously (e.g. by the client disconnecting), these can arrive while the write request is still in flight, causing `TxnCoordSender` to incorrectly believe the transaction hasn't taken out any locks and elide the `EndTxn` request. This would cause the written intents to be left behind. This patch changes `TxnCoordSender` to keep track of any locking requests that have been sent, and only elides `EndTxn` requests when no locking requests have been seen. `txnCommitter` also elides `EndTxn` requests, but this is done below the `txnFinalizeBarrier` which serializes the requests, so this is safe. Release note (bug fix): Fixed a race condition where transaction cleanup would not be triggered if the transaction was rolled back while all locking requests were still in flight.
1 parent e0db8ed commit 98b53dd

File tree

2 files changed

+60
-2
lines changed

2 files changed

+60
-2
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ type TxnCoordSender struct {
112112
// (a retryable TransactionAbortedError in case of the async abort).
113113
closed bool
114114

115+
// sentLocking is set once this transaction has sent a locking request. It
116+
// is used to elide EndTxn requests via finalizeNonLockingTxnLocked. We
117+
// can't rely on txnPipeliner for this, since it doesn't track locks until
118+
// after responses have been received, and an EndTxn(commit=false) could
119+
// arrive before that happens.
120+
sentLocking bool
121+
115122
// txn is the Transaction proto attached to all the requests and updated on
116123
// all the responses.
117124
txn roachpb.Transaction
@@ -485,7 +492,7 @@ func (tc *TxnCoordSender) Send(
485492
return nil, pErr
486493
}
487494

488-
if ba.IsSingleEndTxnRequest() && !tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
495+
if ba.IsSingleEndTxnRequest() && !tc.mu.sentLocking {
489496
return nil, tc.finalizeNonLockingTxnLocked(ctx, ba)
490497
}
491498

@@ -516,6 +523,10 @@ func (tc *TxnCoordSender) Send(
516523
return nil, nil
517524
}
518525

526+
if !tc.mu.sentLocking && ba.IsLocking() {
527+
tc.mu.sentLocking = true
528+
}
529+
519530
// Clone the Txn's Proto so that future modifications can be made without
520531
// worrying about synchronization.
521532
ba.Txn = tc.mu.txn.Clone()

pkg/kv/kvserver/intent_resolver_integration_test.go

+48-1
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,12 @@ func TestReliableIntentCleanup(t *testing.T) {
401401
testTxnExecute := func(t *testing.T, spec testTxnSpec, txn *kv.Txn) (roachpb.Key, error) {
402402
txnKey := genKey(spec.singleRange)
403403

404+
if spec.finalize == "cancelAsync" && spec.abort != "" {
405+
// This would require coordinating the abort, cancel, and put
406+
// goroutines. Doesn't seem worth the added complexity.
407+
require.Fail(t, "Can't combine finalize=cancelAsync and abort")
408+
}
409+
404410
// If requested, spin off txn aborter goroutines, returning errors
405411
// (if any) via abortErrC.
406412
//
@@ -461,6 +467,33 @@ func TestReliableIntentCleanup(t *testing.T) {
461467
require.Fail(t, "invalid abort type", "abort=%v", spec.abort)
462468
}
463469

470+
// If requested, cancel the context while the put is in flight and
471+
// roll back the txn with the cancelled context (which is what the
472+
// SQL session would do). This is different from finalize=cancel in
473+
// that it's a regression test for EndTxn elision:
474+
// https://github.com/cockroachdb/cockroach/issues/65587
475+
cancelAsyncErrC := make(chan error, 1)
476+
if spec.finalize == "cancelAsync" {
477+
readyC := blockPut(t, txnKey)
478+
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "cancel", func(ctx context.Context) {
479+
unblockC := <-readyC
480+
defer close(unblockC)
481+
defer close(cancelAsyncErrC)
482+
483+
// Roll back with a new, cancelled context and wait to
484+
// unblock Put. This is to provoke EndTxn elision by making
485+
// sure the txn abort executes before the Put request. We
486+
// don't cancel the main context, since that would cause the
487+
// Put to error immediately, racing with the txn abort.
488+
rollbackCtx, rollbackCancel := context.WithCancel(ctx)
489+
rollbackCancel()
490+
if err := txn.Rollback(rollbackCtx); err != nil && !errors.Is(err, context.Canceled) {
491+
cancelAsyncErrC <- err
492+
}
493+
time.Sleep(100 * time.Millisecond)
494+
}))
495+
}
496+
464497
// Write numKeys KV pairs in batches of batchSize as a single txn.
465498
const batchSize = 10000
466499
batch := txn.NewBatch()
@@ -475,6 +508,11 @@ func TestReliableIntentCleanup(t *testing.T) {
475508
return nil, err
476509
}
477510
batch = txn.NewBatch()
511+
// If we did an async cancel, submitting another Put violates
512+
// the txn protocol (i.e. the client has already gone away).
513+
if spec.finalize == "cancelAsync" {
514+
break
515+
}
478516
}
479517
}
480518

@@ -499,6 +537,10 @@ func TestReliableIntentCleanup(t *testing.T) {
499537
if err := txn.Rollback(rollbackCtx); err != nil && !errors.Is(err, context.Canceled) {
500538
return nil, err
501539
}
540+
case "cancelAsync":
541+
if err := <-cancelAsyncErrC; err != nil {
542+
return nil, err
543+
}
502544
default:
503545
require.Fail(t, "invalid finalize type", "finalize=%v", spec.finalize)
504546
}
@@ -577,9 +619,14 @@ func TestReliableIntentCleanup(t *testing.T) {
577619
})
578620
return
579621
}
580-
finalize := []interface{}{"commit", "rollback", "cancel"}
622+
finalize := []interface{}{"commit", "rollback", "cancel", "cancelAsync"}
581623
testutils.RunValues(t, "finalize", finalize, func(t *testing.T, finalize interface{}) {
582624
abort := []interface{}{"no", "push", "heartbeat"}
625+
if finalize == "cancelAsync" {
626+
// Async cancel can't currently run together with an
627+
// abort, since we'd have to coordinate the tasks.
628+
abort = []interface{}{"no"}
629+
}
583630
testutils.RunValues(t, "abort", abort, func(t *testing.T, abort interface{}) {
584631
if abort.(string) == "no" {
585632
abort = "" // "no" just makes the test output better

0 commit comments

Comments
 (0)