Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
109749: kv: disallow clearing aborted errors through ClearRetryableErr r=nvanbenschoten a=nvanbenschoten

This commit adds validation to TxnCoordSender.ClearRetryableErr that the TxnCoordSender is actually in a retryable error state and that the error is not a transaction aborted error.

This was not the cause of cockroachdb#108853, but it's related.

Epic: None
Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 30, 2023
2 parents 7ed7f00 + 1de2ac4 commit 9fdbc61
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 15 deletions.
13 changes: 9 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,13 +1231,18 @@ func (tc *TxnCoordSender) GetRetryableErr(
}

// ClearRetryableErr is part of the TxnSender interface.
func (tc *TxnCoordSender) ClearRetryableErr(ctx context.Context) {
func (tc *TxnCoordSender) ClearRetryableErr(ctx context.Context) error {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txnState == txnRetryableError {
tc.mu.storedRetryableErr = nil
tc.mu.txnState = txnPending
if tc.mu.txnState != txnRetryableError {
return errors.AssertionFailedf("cannot clear retryable error, in state: %s", tc.mu.txnState)
}
if tc.mu.storedRetryableErr.PrevTxnAborted() {
return errors.AssertionFailedf("cannot clear retryable error, txn aborted: %s", tc.mu.txn)
}
tc.mu.txnState = txnPending
tc.mu.storedRetryableErr = nil
return nil
}

// IsSerializablePushAndRefreshNotPossible is part of the kv.TxnSender interface.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) {

// Restart the transaction with a new epoch.
require.Error(t, txn.Sender().GenerateForcedRetryableErr(ctx, s.Clock.Now(), true /* mustRestart */, "force retry"))
txn.Sender().ClearRetryableErr(ctx)
require.NoError(t, txn.Sender().ClearRetryableErr(ctx))

// Now immediately commit.
require.NoError(t, txn.Commit(ctx))
Expand Down Expand Up @@ -2827,15 +2827,15 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
require.Error(t, txn.Sender().GenerateForcedRetryableErr(ctx, txn.ReadTimestamp().Next(), true /* mustRestart */, "force retry"))
txn.Sender().ClearRetryableErr(ctx)
require.NoError(t, txn.Sender().ClearRetryableErr(ctx))
},
},
{
name: "write before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Put(ctx, "k", "v"))
require.Error(t, txn.Sender().GenerateForcedRetryableErr(ctx, txn.ReadTimestamp().Next(), true /* mustRestart */, "force retry"))
txn.Sender().ClearRetryableErr(ctx)
require.NoError(t, txn.Sender().ClearRetryableErr(ctx))
},
},
{
Expand All @@ -2845,7 +2845,7 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, "k", "v"))
require.Error(t, txn.Sender().GenerateForcedRetryableErr(ctx, txn.ReadTimestamp().Next(), true /* mustRestart */, "force retry"))
txn.Sender().ClearRetryableErr(ctx)
require.NoError(t, txn.Sender().ClearRetryableErr(ctx))
},
},
} {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ func (m *MockTransactionalSender) GetRetryableErr(
}

// ClearRetryableErr is part of the TxnSender interface.
func (m *MockTransactionalSender) ClearRetryableErr(ctx context.Context) {
func (m *MockTransactionalSender) ClearRetryableErr(ctx context.Context) error {
return nil
}

// IsSerializablePushAndRefreshNotPossible is part of the TxnSender interface.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ type TxnSender interface {
// TxnSender usable again.
GetRetryableErr(ctx context.Context) *kvpb.TransactionRetryWithProtoRefreshError

// ClearRetryableErr clears the retryable error, if any.
ClearRetryableErr(ctx context.Context)
// ClearRetryableErr clears the retryable error. Returns an error if the
// TxnSender was not in a retryable error state or if the TxnSender was
// aborted.
ClearRetryableErr(ctx context.Context) error

// DisablePipelining instructs the TxnSender not to pipeline
// requests. It should rarely be necessary to call this method. It
Expand Down
6 changes: 2 additions & 4 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,7 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) error {
// If the retryable error doesn't correspond to an aborted transaction,
// there's no need to switch out the transaction. We simply clear the
// retryable error and proceed.
txn.mu.sender.ClearRetryableErr(ctx)
return nil
return txn.mu.sender.ClearRetryableErr(ctx)
}

return txn.handleTransactionAbortedErrorLocked(ctx, retryErr)
Expand Down Expand Up @@ -1103,8 +1102,7 @@ func (txn *Txn) PrepareForPartialRetry(ctx context.Context) error {
log.VEventf(ctx, 2, "partially retrying transaction: %s because of a retryable error: %s",
txn.debugNameLocked(), retryErr)

txn.mu.sender.ClearRetryableErr(ctx)
return nil
return txn.mu.sender.ClearRetryableErr(ctx)
}

func (txn *Txn) checkRetryErrorTxnIDLocked(
Expand Down

0 comments on commit 9fdbc61

Please sign in to comment.