Skip to content

Commit b367897

Browse files
craig[bot]erikgrinakeryuzefovich
committed
Merge #69966 #70210
69966: kvcoord: avoid concurrent rollbacks when making parallel commits explicit r=nvanbenschoten,andreimatei a=erikgrinaker `TxnCoordSender` allows `EndTxn(commit=false)` rollback requests even if the transaction state is finalized, since clients can send multiple rollbacks (e.g. due to context cancellation). However, it allowed this even when the transaction was committed. This could pass the request through while the `txnCommitter` was asynchronously making an implicit commit explicit, which would violate the `txnLockGatekeeper` requirement that transaction requests are synchronous (non-concurrent) which would return an unexpected error for the rollback. This patch rejects additional `EndTxn(commit=false)` requests if the finalized transaction is known to be committed, to prevent this race condition. If rejected, the returned error is of the same type that would be returned by `EndTxn` evaluation, although with a different message string. Note that even though the returned error should really have `REASON_TXN_COMMITTED` in this case, which is also what `txn.Rollback()` expects in order to omit logging, the current `EndTxn` code incorrectly returns `REASON_TXN_UNKNOWN` in this case. This behavior is retained to minimize the change, but should be corrected separately. Resolves #68643. Informs #69965. Release justification: fixes for high-priority or high-severity bugs in existing functionality Release note: None 70210: rowexec: fix a ctx NPE in couple of processors in edge cases r=yuzefovich a=yuzefovich We recently extended the statistics that are collected during EXPLAIN ANALYZE runs to include the "scan stats". However, in the joinReader and the tableReader those are collected when closing the processor based on the context argument, and in some edge cases (like when `Start` was never called) might be nil leading to a NPE. This commit fixes the problem by unifying all processors to collect all execution stats in `execStatsForTrace` (which isn't called in the edge case mentioned above). Fixes: #70075. Fixes: #70107. Release note: None (no release with this bug) Co-authored-by: Erik Grinaker <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
3 parents f26d7ad + 40ab220 + 26b4cd5 commit b367897

File tree

4 files changed

+86
-9
lines changed

4 files changed

+86
-9
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -645,11 +645,16 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) er
645645
func (tc *TxnCoordSender) maybeRejectClientLocked(
646646
ctx context.Context, ba *roachpb.BatchRequest,
647647
) *roachpb.Error {
648-
if ba != nil && ba.IsSingleAbortTxnRequest() {
648+
if ba != nil && ba.IsSingleAbortTxnRequest() && tc.mu.txn.Status != roachpb.COMMITTED {
649649
// As a special case, we allow rollbacks to be sent at any time. Any
650650
// rollback attempt moves the TxnCoordSender state to txnFinalized, but higher
651651
// layers are free to retry rollbacks if they want (and they do, for
652652
// example, when the context was canceled while txn.Rollback() was running).
653+
//
654+
// However, we reject this if we know that the transaction has been
655+
// committed, to avoid sending the rollback concurrently with the
656+
// txnCommitter asynchronously making the commit explicit. See:
657+
// https://github.com/cockroachdb/cockroach/issues/68643
653658
return nil
654659
}
655660

pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"fmt"
1717
"reflect"
1818
"strconv"
19+
"sync"
1920
"sync/atomic"
2021
"testing"
2122
"time"
@@ -379,6 +380,83 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {
379380
}
380381
}
381382

383+
// TestTxnCoordSenderCommitCanceled is a regression test for
384+
// https://github.com/cockroachdb/cockroach/issues/68643. It makes sure that an
385+
// EndTxn(commit=false) sent by the caller in response to a client context
386+
// cancellation isn't passed through TxnCoordSender concurrently with an
387+
// asynchronous EndTxn(commit=true) request sent by txnCommitter to make an
388+
// implicitly committed transaction explicit.
389+
func TestTxnCoordSenderCommitCanceled(t *testing.T) {
390+
defer leaktest.AfterTest(t)()
391+
defer log.Scope(t).Close(t)
392+
393+
ctx := context.Background()
394+
395+
// blockCommits is used to block commit responses for a given txn. The key is
396+
// a txn ID, and the value is a ready channel (chan struct) that will be
397+
// closed when the commit has been received and blocked.
398+
var blockCommits sync.Map
399+
responseFilter := func(_ context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
400+
if arg, ok := ba.GetArg(roachpb.EndTxn); ok && ba.Txn != nil {
401+
et := arg.(*roachpb.EndTxnRequest)
402+
readyC, ok := blockCommits.Load(ba.Txn.ID)
403+
if ok && et.Commit && len(et.InFlightWrites) == 0 {
404+
close(readyC.(chan struct{})) // notify test that commit is received and blocked
405+
<-ctx.Done() // wait for test to complete (NB: not the passed context)
406+
}
407+
}
408+
return nil
409+
}
410+
411+
s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
412+
TestingResponseFilter: responseFilter,
413+
})
414+
defer s.Stop()
415+
ctx, _ = s.Stopper().WithCancelOnQuiesce(ctx)
416+
417+
// Set up a new txn, and write a couple of values.
418+
txn := kv.NewTxn(ctx, s.DB, 0)
419+
require.NoError(t, txn.Put(ctx, "a", "1"))
420+
require.NoError(t, txn.Put(ctx, "b", "2"))
421+
422+
// Read back a. This is crucial to reproduce the original bug. We need
423+
// txnPipeliner to record the lock in its lock footprint, but it doesn't do
424+
// that if the intents are proven together with the commit EndTxn request
425+
// (because it incorrectly assumes no further requests will be sent). If the
426+
// lock footprint isn't updated, the TxnCoordSender will incorrectly believe
427+
// the txn hasn't taken out any locks, and will elide the final
428+
// EndTxn(commit=false) rollback request. For details, see:
429+
// https://github.com/cockroachdb/cockroach/issues/68643
430+
_, err := txn.Get(ctx, "a")
431+
require.NoError(t, err)
432+
433+
// Commit the transaction, but ask the response filter to block the final
434+
// async commit sent by txnCommitter to make the implicit commit explicit.
435+
readyC := make(chan struct{})
436+
blockCommits.Store(txn.ID(), readyC)
437+
require.NoError(t, txn.Commit(ctx))
438+
<-readyC
439+
440+
// From the TxnCoordSender's point of view, the txn is implicitly committed,
441+
// and the commit response is on its way back up the stack. However, if the
442+
// client were to disconnect before receiving the response (canceling the
443+
// context), and something rolls back the transaction because of that, then
444+
// txn.Rollback() would send an asynchronous rollback request using a separate
445+
// context.
446+
//
447+
// However, this is hard to test since txn.Rollback() in this case sends the
448+
// EndTxn(commit=false) async. We instead replicate what Txn.Rollback() would
449+
// do here (i.e. send a EndTxn(commit=false)) and assert that we receive the
450+
// expected error.
451+
var ba roachpb.BatchRequest
452+
ba.Add(&roachpb.EndTxnRequest{Commit: false})
453+
_, pErr := txn.Send(ctx, ba)
454+
require.NotNil(t, pErr)
455+
require.IsType(t, &roachpb.TransactionStatusError{}, pErr.GetDetail())
456+
// TODO(erikgrinaker): This should really assert REASON_TXN_COMMITTED, but
457+
// we return REASON_TXN_UNKNOWN to preserve existing EndTxn behavior.
458+
}
459+
382460
// TestTxnCoordSenderAddLockOnError verifies that locks are tracked if the
383461
// transaction is, even on error.
384462
func TestTxnCoordSenderAddLockOnError(t *testing.T) {

pkg/sql/rowexec/joinreader.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -883,11 +883,6 @@ func (jr *joinReader) ConsumerClosed() {
883883
}
884884

885885
func (jr *joinReader) close() {
886-
// Make sure to clone any tracing span so that stats can pick it up later.
887-
// Stats are only collected after we finish closing the processor.
888-
if !jr.Closed {
889-
jr.scanStats = execinfra.GetScanStats(jr.Ctx)
890-
}
891886
if jr.InternalClose() {
892887
if jr.fetcher != nil {
893888
jr.fetcher.Close(jr.Ctx)
@@ -918,6 +913,7 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats {
918913
}
919914

920915
// TODO(asubiotto): Add memory and disk usage to EXPLAIN ANALYZE.
916+
jr.scanStats = execinfra.GetScanStats(jr.Ctx)
921917
ret := &execinfrapb.ComponentStats{
922918
Inputs: []execinfrapb.InputStats{is},
923919
KV: execinfrapb.KVStats{

pkg/sql/rowexec/tablereader.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,6 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata
299299

300300
func (tr *tableReader) close() {
301301
if tr.InternalClose() {
302-
// scanStats is collected from the trace after we finish doing work for this
303-
// join.
304-
tr.scanStats = execinfra.GetScanStats(tr.Ctx)
305302
if tr.fetcher != nil {
306303
tr.fetcher.Close(tr.Ctx)
307304
}
@@ -319,6 +316,7 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats {
319316
if !ok {
320317
return nil
321318
}
319+
tr.scanStats = execinfra.GetScanStats(tr.Ctx)
322320
ret := &execinfrapb.ComponentStats{
323321
KV: execinfrapb.KVStats{
324322
BytesRead: optional.MakeUint(uint64(tr.fetcher.GetBytesRead())),

0 commit comments

Comments
 (0)