Skip to content

Commit 10d76b9

Browse files
committed
kvcoord: avoid concurrent rollbacks when making parallel commits explicit
`TxnCoordSender` allows `EndTxn(commit=false)` rollback requests even if the transaction state is finalized, since clients can send multiple rollbacks (e.g. due to context cancellation). However, it allowed this even when the transaction was committed. This could pass the request through while the `txnCommitter` was asynchronously making an implicit commit explicit, which would violate the `txnLockGatekeeper` requirement that transaction requests are synchronous (non-concurrent) which would return an unexpected error for the rollback. This patch rejects additional `EndTxn(commit=false)` requests if the finalized transaction is known to be committed, to prevent this race condition. If rejected, the returned error is of the same type that would be returned by `EndTxn` evaluation, although with a different message string. Note that even though the returned error should really have `REASON_TXN_COMMITTED` in this case, which is also what `txn.Rollback()` expects in order to omit logging, the current `EndTxn` code incorrectly returns `REASON_TXN_UNKNOWN` in this case. This behavior is retained to minimize the change, but should be corrected separately. Release justification: fixes for high-priority or high-severity bugs in existing functionality Release note: None
1 parent d3fc366 commit 10d76b9

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -645,11 +645,16 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) er
645645
func (tc *TxnCoordSender) maybeRejectClientLocked(
646646
ctx context.Context, ba *roachpb.BatchRequest,
647647
) *roachpb.Error {
648-
if ba != nil && ba.IsSingleAbortTxnRequest() {
648+
if ba != nil && ba.IsSingleAbortTxnRequest() && tc.mu.txn.Status != roachpb.COMMITTED {
649649
// As a special case, we allow rollbacks to be sent at any time. Any
650650
// rollback attempt moves the TxnCoordSender state to txnFinalized, but higher
651651
// layers are free to retry rollbacks if they want (and they do, for
652652
// example, when the context was canceled while txn.Rollback() was running).
653+
//
654+
// However, we reject this if we know that the transaction has been
655+
// committed, to avoid sending the rollback concurrently with the
656+
// txnCommitter asynchronously making the commit explicit. See:
657+
// https://github.com/cockroachdb/cockroach/issues/68643
653658
return nil
654659
}
655660

pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"fmt"
1717
"reflect"
1818
"strconv"
19+
"sync"
1920
"sync/atomic"
2021
"testing"
2122
"time"
@@ -379,6 +380,83 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {
379380
}
380381
}
381382

383+
// TestTxnCoordSenderCommitCanceled is a regression test for
384+
// https://github.com/cockroachdb/cockroach/issues/68643. It makes sure that an
385+
// EndTxn(commit=false) sent by the caller in response to a client context
386+
// cancellation isn't passed through TxnCoordSender concurrently with an
387+
// asynchronous EndTxn(commit=true) request sent by txnCommitter to make an
388+
// implicitly committed transaction explicit.
389+
func TestTxnCoordSenderCommitCanceled(t *testing.T) {
390+
defer leaktest.AfterTest(t)()
391+
defer log.Scope(t).Close(t)
392+
393+
ctx := context.Background()
394+
395+
// blockCommits is used to block commit responses for a given txn. The key is
396+
// a txn ID, and the value is a ready channel (chan struct) that will be
397+
// closed when the commit has been received and blocked.
398+
var blockCommits sync.Map
399+
responseFilter := func(_ context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
400+
if arg, ok := ba.GetArg(roachpb.EndTxn); ok && ba.Txn != nil {
401+
et := arg.(*roachpb.EndTxnRequest)
402+
readyC, ok := blockCommits.Load(ba.Txn.ID)
403+
if ok && et.Commit && len(et.InFlightWrites) == 0 {
404+
close(readyC.(chan struct{})) // notify test that commit is received and blocked
405+
<-ctx.Done() // wait for test to complete (NB: not the passed context)
406+
}
407+
}
408+
return nil
409+
}
410+
411+
s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
412+
TestingResponseFilter: responseFilter,
413+
})
414+
defer s.Stop()
415+
ctx, _ = s.Stopper().WithCancelOnQuiesce(ctx)
416+
417+
// Set up a new txn, and write a couple of values.
418+
txn := kv.NewTxn(ctx, s.DB, 0)
419+
require.NoError(t, txn.Put(ctx, "a", "1"))
420+
require.NoError(t, txn.Put(ctx, "b", "2"))
421+
422+
// Read back a. This is crucial to reproduce the original bug. We need
423+
// txnPipeliner to record the lock in its lock footprint, but it doesn't do
424+
// that if the intents are proven together with the commit EndTxn request
425+
// (because it incorrectly assumes no further requests will be sent). If the
426+
// lock footprint isn't updated, the TxnCoordSender will incorrectly believe
427+
// the txn hasn't taken out any locks, and will elide the final
428+
// EndTxn(commit=false) rollback request. For details, see:
429+
// https://github.com/cockroachdb/cockroach/issues/68643
430+
_, err := txn.Get(ctx, "a")
431+
require.NoError(t, err)
432+
433+
// Commit the transaction, but ask the response filter to block the final
434+
// async commit sent by txnCommitter to make the implicit commit explicit.
435+
readyC := make(chan struct{})
436+
blockCommits.Store(txn.ID(), readyC)
437+
require.NoError(t, txn.Commit(ctx))
438+
<-readyC
439+
440+
// From the TxnCoordSender's point of view, the txn is implicitly committed,
441+
// and the commit response is on its way back up the stack. However, if the
442+
// client were to disconnect before receiving the response (canceling the
443+
// context), and something rolls back the transaction because of that, then
444+
// txn.Rollback() would send an asynchronous rollback request using a separate
445+
// context.
446+
//
447+
// However, this is hard to test since txn.Rollback() in this case sends the
448+
// EndTxn(commit=false) async. We instead replicate what Txn.Rollback() would
449+
// do here (i.e. send a EndTxn(commit=false)) and assert that we receive the
450+
// expected error.
451+
var ba roachpb.BatchRequest
452+
ba.Add(&roachpb.EndTxnRequest{Commit: false})
453+
_, pErr := txn.Send(ctx, ba)
454+
require.NotNil(t, pErr)
455+
require.IsType(t, &roachpb.TransactionStatusError{}, pErr.GetDetail())
456+
// TODO(erikgrinaker): This should really assert REASON_TXN_COMMITTED, but
457+
// we return REASON_TXN_UNKNOWN to preserve existing EndTxn behavior.
458+
}
459+
382460
// TestTxnCoordSenderAddLockOnError verifies that locks are tracked if the
383461
// transaction is, even on error.
384462
func TestTxnCoordSenderAddLockOnError(t *testing.T) {

0 commit comments

Comments
 (0)