Skip to content

Commit

Permalink
Merge #102098
Browse files Browse the repository at this point in the history
102098: kv: support Rollback in Txn closure r=arulajmani a=nvanbenschoten

This commit adds support for the following pattern in a transaction closure:

```go
db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
    return txn.Rollback(ctx)
})
```

Surprisingly, this would previously return an error:
```
TransactionStatusError: client already committed or rolled back the transaction. Trying to execute: 1 EndTxn (REASON_UNKNOWN)
```

This was because the txn retry loop in `txn.exec` would attempt to commit the rolled back transaction and encounter the `TransactionStatusError`. The loop is now made aware of the client finalization state, and uses this to decide whether to auto-commit.

Epic: None
Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Apr 24, 2023
2 parents cfa9ede + 69749fe commit 71c3539
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 20 deletions.
28 changes: 25 additions & 3 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,11 @@ func TestDB_DelRange(t *testing.T) {
func TestTxn_Commit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, db := setup(t)
defer s.Stopper().Stop(context.Background())
defer s.Stopper().Stop(ctx)

err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Put("aa", "1")
b.Put("ab", "2")
Expand All @@ -575,7 +576,7 @@ func TestTxn_Commit(t *testing.T) {
b := &kv.Batch{}
b.Get("aa")
b.Get("ab")
if err := db.Run(context.Background(), b); err != nil {
if err := db.Run(ctx, b); err != nil {
t.Fatal(err)
}
expected := map[string][]byte{
Expand All @@ -585,6 +586,27 @@ func TestTxn_Commit(t *testing.T) {
checkResults(t, expected, b.Results)
}

func TestTxn_Rollback(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, db := setup(t)
defer s.Stopper().Stop(ctx)

err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.Put(ctx, "a", "1"); err != nil {
return err
}
return txn.Rollback(ctx)
})
require.NoError(t, err)

// Check that the transaction was rolled back.
r, err := db.Get(ctx, "a")
require.NoError(t, err)
require.False(t, r.Exists())
}

func TestDB_Put_insecure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,13 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
return nil
}

// ClientFinalized is part of the kv.TxnSender interface.
func (tc *TxnCoordSender) ClientFinalized() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.mu.txnState == txnFinalized
}

// finalizeAndCleanupTxnLocked marks the transaction state as finalized and
// closes all interceptors.
func (tc *TxnCoordSender) finalizeAndCleanupTxnLocked(ctx context.Context) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (m *MockTransactionalSender) TxnStatus() roachpb.TransactionStatus {
return m.txn.Status
}

// ClientFinalized is part of the TxnSender interface.
func (m *MockTransactionalSender) ClientFinalized() bool {
return m.txn.Status.IsFinalized()
}

// SetIsoLevel is part of the TxnSender interface.
func (m *MockTransactionalSender) SetIsoLevel(isoLevel isolation.Level) error {
m.txn.IsoLevel = isoLevel
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ type TxnSender interface {
// TxnStatus exports the txn's status.
TxnStatus() roachpb.TransactionStatus

// ClientFinalized returns true is the client has issued an EndTxn
// request in an attempt to finalize the transaction. Once finalized,
// further batches except EndTxn(commit=false) will be rejected.
ClientFinalized() bool

// CreateSavepoint establishes a savepoint.
// This method is only valid when called on RootTxns.
//
Expand Down
21 changes: 18 additions & 3 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ func (txn *Txn) IsOpen() bool {
return txn.statusLocked() == roachpb.PENDING
}

// isClientFinalized returns true if the client has issued an EndTxn request in
// an attempt to finalize the transaction.
func (txn *Txn) isClientFinalized() bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.ClientFinalized()
}

// SetIsoLevel sets the transaction's isolation level. Transactions default to
// Serializable isolation. The isolation must be set before any operations are
// performed on the transaction.
Expand Down Expand Up @@ -941,10 +949,11 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
}
err = fn(ctx, txn)

// Commit on success, unless the txn has already been committed by the
// closure. We allow that, as closure might want to run 1PC transactions.
// Commit on success, unless the txn has already been committed or rolled
// back by the closure. We allow that, as the closure might want to run 1PC
// transactions or might want to rollback on certain conditions.
if err == nil {
if !txn.IsCommitted() {
if !txn.isClientFinalized() {
err = txn.Commit(ctx)
log.Eventf(ctx, "kv.Txn did AutoCommit. err: %v", err)
if err != nil {
Expand Down Expand Up @@ -976,6 +985,12 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// of handling it.
return errors.Wrapf(err, "retryable error from another txn")
}
if txn.isClientFinalized() {
// We've already committed or rolled back, so we can't retry. The
// closure should not have returned a retryable error in this case.
return errors.NewAssertionErrorWithWrappedErrf(err,
"client already committed or rolled back")
}
retryable = true
}
}
Expand Down
24 changes: 10 additions & 14 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,22 +602,18 @@ func TestGenerateForcedRetryableErrorAfterRollback(t *testing.T) {
}
var i int
txnErr := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
e1 := txn.Put(ctx, mkKey("a"), 1)
i++
require.LessOrEqual(t, i, 2)
if i == 1 {
require.NoError(t, e1)
// Prepare an error to return after the rollback.
retryErr := txn.GenerateForcedRetryableError(ctx, "force retry")
// Rolling back completes the transaction, returning an error is invalid.
require.NoError(t, txn.Rollback(ctx))
return retryErr
} else {
require.ErrorContains(t, e1, "TransactionStatusError", i)
return nil
}
require.Equal(t, 1, i)
e1 := txn.Put(ctx, mkKey("a"), 1)
require.NoError(t, e1)
// Prepare an error to return after the rollback.
retryErr := txn.GenerateForcedRetryableError(ctx, "force retry")
// Rolling back completes the transaction, returning an error is invalid.
require.NoError(t, txn.Rollback(ctx))
return retryErr
})
require.ErrorContains(t, txnErr, "TransactionStatusError")
require.ErrorContains(t, txnErr, "client already committed or rolled back")
require.True(t, errors.IsAssertionFailure(txnErr))
checkKey(t, "a", 0)
}

Expand Down

0 comments on commit 71c3539

Please sign in to comment.