Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: kvcoord: avoid concurrent rollbacks when making parallel commits explicit #70227

Merged
merged 1 commit into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,16 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) er
func (tc *TxnCoordSender) maybeRejectClientLocked(
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
if ba != nil && ba.IsSingleAbortTxnRequest() {
if ba != nil && ba.IsSingleAbortTxnRequest() && tc.mu.txn.Status != roachpb.COMMITTED {
// As a special case, we allow rollbacks to be sent at any time. Any
// rollback attempt moves the TxnCoordSender state to txnFinalized, but higher
// layers are free to retry rollbacks if they want (and they do, for
// example, when the context was canceled while txn.Rollback() was running).
//
// However, we reject this if we know that the transaction has been
// committed, to avoid sending the rollback concurrently with the
// txnCommitter asynchronously making the commit explicit. See:
// https://github.com/cockroachdb/cockroach/issues/68643
return nil
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -379,6 +380,83 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {
}
}

// TestTxnCoordSenderCommitCanceled is a regression test for
// https://github.com/cockroachdb/cockroach/issues/68643. It makes sure that an
// EndTxn(commit=false) sent by the caller in response to a client context
// cancellation isn't passed through TxnCoordSender concurrently with an
// asynchronous EndTxn(commit=true) request sent by txnCommitter to make an
// implicitly committed transaction explicit.
func TestTxnCoordSenderCommitCanceled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// blockCommits is used to block commit responses for a given txn. The key is
// a txn ID, and the value is a ready channel (chan struct) that will be
// closed when the commit has been received and blocked.
var blockCommits sync.Map
responseFilter := func(_ context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if arg, ok := ba.GetArg(roachpb.EndTxn); ok && ba.Txn != nil {
et := arg.(*roachpb.EndTxnRequest)
readyC, ok := blockCommits.Load(ba.Txn.ID)
if ok && et.Commit && len(et.InFlightWrites) == 0 {
close(readyC.(chan struct{})) // notify test that commit is received and blocked
<-ctx.Done() // wait for test to complete (NB: not the passed context)
}
}
return nil
}

s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
TestingResponseFilter: responseFilter,
})
defer s.Stop()
ctx, _ = s.Stopper().WithCancelOnQuiesce(ctx)

// Set up a new txn, and write a couple of values.
txn := kv.NewTxn(ctx, s.DB, 0)
require.NoError(t, txn.Put(ctx, "a", "1"))
require.NoError(t, txn.Put(ctx, "b", "2"))

// Read back a. This is crucial to reproduce the original bug. We need
// txnPipeliner to record the lock in its lock footprint, but it doesn't do
// that if the intents are proven together with the commit EndTxn request
// (because it incorrectly assumes no further requests will be sent). If the
// lock footprint isn't updated, the TxnCoordSender will incorrectly believe
// the txn hasn't taken out any locks, and will elide the final
// EndTxn(commit=false) rollback request. For details, see:
// https://github.com/cockroachdb/cockroach/issues/68643
_, err := txn.Get(ctx, "a")
require.NoError(t, err)

// Commit the transaction, but ask the response filter to block the final
// async commit sent by txnCommitter to make the implicit commit explicit.
readyC := make(chan struct{})
blockCommits.Store(txn.ID(), readyC)
require.NoError(t, txn.Commit(ctx))
<-readyC

// From the TxnCoordSender's point of view, the txn is implicitly committed,
// and the commit response is on its way back up the stack. However, if the
// client were to disconnect before receiving the response (canceling the
// context), and something rolls back the transaction because of that, then
// txn.Rollback() would send an asynchronous rollback request using a separate
// context.
//
// However, this is hard to test since txn.Rollback() in this case sends the
// EndTxn(commit=false) async. We instead replicate what Txn.Rollback() would
// do here (i.e. send a EndTxn(commit=false)) and assert that we receive the
// expected error.
var ba roachpb.BatchRequest
ba.Add(&roachpb.EndTxnRequest{Commit: false})
_, pErr := txn.Send(ctx, ba)
require.NotNil(t, pErr)
require.IsType(t, &roachpb.TransactionStatusError{}, pErr.GetDetail())
// TODO(erikgrinaker): This should really assert REASON_TXN_COMMITTED, but
// we return REASON_TXN_UNKNOWN to preserve existing EndTxn behavior.
}

// TestTxnCoordSenderAddLockOnError verifies that locks are tracked if the
// transaction is, even on error.
func TestTxnCoordSenderAddLockOnError(t *testing.T) {
Expand Down