Skip to content

Commit

Permalink
kv: test for issue #22615
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
lidorcarmel committed Jan 6, 2022
1 parent e299a3e commit 96be6cc
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 0 deletions.
122 changes: 122 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,125 @@ func TestDBDecommissionedOperations(t *testing.T) {
})
}
}

func TestDB_TxnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

runNumber := 0
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, txn.Put(ctx, "aa", "1"))
require.NoError(t, txn.Put(ctx, "bb", "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
// Only write if we have not written before, because otherwise we will keep aborting
// the other txn forever.
r, e := hpTxn.Get(ctx, "aa")
require.NoError(t, e)
if !r.Exists() {
require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
} else {
require.NoError(t, hpTxn.Rollback(ctx))
}
}

// Read, so that we'll get a retryable error.
r, e := txn.Get(ctx, "aa")
if runNumber == 0 {
// First run, we should get a retryable error.
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())
} else {
// The retry should succeed.
require.NoError(t, e)
require.Equal(t, []byte("1"), r.ValueBytes())
}
runNumber++

// Return the retryable error.
return e
})
require.NoError(t, err)

err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// The high priority txn was overridden by the retry.
kv, e1 := txn.Get(ctx, "aa")
require.NoError(t, e1)
require.Equal(t, []byte("1"), kv.ValueBytes())

// The retry succeeded.
kv, e2 := txn.Get(ctx, "bb")
require.NoError(t, e2)
require.Equal(t, []byte("1"), kv.ValueBytes())
return nil
})
require.NoError(t, err1)
}

func TestDB_TxnReturnNil(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

runNumber := 0
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, txn.Put(ctx, "aa", "1"))
require.NoError(t, txn.Put(ctx, "bb", "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
// Only write if we have not written before, because otherwise we will keep aborting
// the other txn forever.
r, e := hpTxn.Get(ctx, "aa")
require.NoError(t, e)
if !r.Exists() {
require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
} else {
require.NoError(t, hpTxn.Rollback(ctx))
}
}

// Read, so that we'll get a retryable error.
r, e := txn.Get(ctx, "aa")
require.Zero(t, runNumber)
// First and only run, we should get a retryable error.
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())
runNumber++

// At this point txn is poisoned, and any op returns the same (poisoning) error.
r, e = txn.Get(ctx, "bb")
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())

// Return nil - the retry loop will not retry.
return nil
})
// db.Txn should return the retryable error that poisoned txn.
expectedErr := (*roachpb.TransactionRetryWithProtoRefreshError)(nil)
require.True(t, errors.As(err, &expectedErr))

err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// The high priority txn succeeded.
kv, e1 := txn.Get(ctx, "aa")
require.NoError(t, e1)
require.Equal(t, []byte("hp txn"), kv.ValueBytes())

// Main txn failed.
kv, e2 := txn.Get(ctx, "bb")
require.NoError(t, e2)
require.Equal(t, []byte(nil), kv.ValueBytes())
return nil
})
require.NoError(t, err1)
}
38 changes: 38 additions & 0 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Txn struct {
// The txn has to be committed by this deadline. A nil value indicates no
// deadline.
deadline *hlc.Timestamp

poisonErr error
}

// admissionHeader is used for admission control for work done in this
Expand Down Expand Up @@ -840,13 +842,29 @@ func (txn *Txn) resetDeadlineLocked() {
txn.mu.deadline = nil
}

func (txn *Txn) setPoisonedLocked(ctx context.Context, err error) {
log.VEventf(ctx, 2, "poisoning txn: %v", err)
txn.mu.poisonErr = err
}

func (txn *Txn) clearPoisonedLocked(ctx context.Context) {
if txn.mu.poisonErr != nil {
log.VEventf(ctx, 2, "clearing poisoning: %v", txn.mu.poisonErr)
txn.mu.poisonErr = nil
}
}

// Rollback sends an EndTxnRequest with Commit=false.
// txn is considered finalized and cannot be used to send any more commands.
func (txn *Txn) Rollback(ctx context.Context) error {
if txn.typ != RootTxn {
return errors.WithContextTags(errors.AssertionFailedf("Rollback() called on leaf txn"), ctx)
}

txn.mu.Lock()
txn.clearPoisonedLocked(ctx)
txn.mu.Unlock()

return txn.rollback(ctx).GoError()
}

Expand Down Expand Up @@ -974,6 +992,14 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// Commit on success, unless the txn has already been committed by the
// closure. We allow that, as closure might want to run 1PC transactions.
if err == nil {
// fn returned nil but something went wrong and txn is poisoned, therefore we fail.
txn.mu.Lock()
poisonErr := txn.mu.poisonErr
txn.mu.Unlock()
if poisonErr != nil {
log.VEventf(ctx, 2, "txn was poisoned: %v", poisonErr)
return poisonErr
}
if !txn.IsCommitted() {
err = txn.Commit(ctx)
log.Eventf(ctx, "client.Txn did AutoCommit. err: %v", err)
Expand Down Expand Up @@ -1030,6 +1056,11 @@ func (txn *Txn) PrepareForRetry(ctx context.Context, err error) {
txn.commitTriggers = nil
log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s",
txn.DebugName(), err)

// Clear poisoning for the retry.
txn.mu.Lock()
txn.clearPoisonedLocked(ctx)
txn.mu.Unlock()
}

// IsRetryableErrMeantForTxn returns true if err is a retryable
Expand Down Expand Up @@ -1091,7 +1122,13 @@ func (txn *Txn) Send(
txn.mu.Lock()
requestTxnID := txn.mu.ID
sender := txn.mu.sender
poisonErr := txn.mu.poisonErr
txn.mu.Unlock()
if poisonErr != nil {
// All ops should fail because txn is poisoned.
log.VEventf(ctx, 2, "txn was poisoned: %v", poisonErr)
return nil, roachpb.NewError(poisonErr)
}
br, pErr := txn.db.sendUsingSender(ctx, ba, sender)
if pErr == nil {
return br, nil
Expand Down Expand Up @@ -1120,6 +1157,7 @@ func (txn *Txn) handleErrIfRetryableLocked(ctx context.Context, err error) {
if !errors.As(err, &retryErr) {
return
}
txn.setPoisonedLocked(ctx, err)
txn.resetDeadlineLocked()
txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID)
}
Expand Down

0 comments on commit 96be6cc

Please sign in to comment.