Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql,kv: bubble up retry errors when creating leaf transactions #98713

Merged
merged 1 commit into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/kv/kvpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func PrepareTransactionForRetry(
case *WriteTooOldError:
// Increase the timestamp to the ts at which we've actually written.
txn.WriteTimestamp.Forward(tErr.RetryTimestamp())
case *IntentMissingError:
// IntentMissingErrors are not expected to be handled at this level;
// We instead expect the txnPipeliner to transform them into a
// TransactionRetryErrors(RETRY_ASYNC_WRITE_FAILURE) error.
log.Fatalf(
ctx, "unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail(),
)
default:
log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr)
}
Expand Down
30 changes: 12 additions & 18 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,8 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) {
}
log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s",
txn.debugNameLocked(), retryErr)
txn.handleRetryableErrLocked(ctx, retryErr)
txn.resetDeadlineLocked()
txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID)
}

// IsRetryableErrMeantForTxn returns true if err is a retryable
Expand Down Expand Up @@ -1073,13 +1074,6 @@ func (txn *Txn) Send(
return br, pErr
}

func (txn *Txn) handleRetryableErrLocked(
ctx context.Context, retryErr *kvpb.TransactionRetryWithProtoRefreshError,
) {
txn.resetDeadlineLocked()
txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID)
}

// NegotiateAndSend is a specialized version of Send that is capable of
// orchestrating a bounded-staleness read through the transaction, given a
// read-only BatchRequest with a min_timestamp_bound set in its Header.
Expand Down Expand Up @@ -1251,10 +1245,11 @@ func (txn *Txn) GetLeafTxnInputState(ctx context.Context) *roachpb.LeafTxnInputS

// GetLeafTxnInputStateOrRejectClient is like GetLeafTxnInputState
// except, if the transaction is already aborted or otherwise in state
// that cannot make progress, it returns an error. If the transaction
// is aborted, the error will be a retryable one, and the transaction
// will have been prepared for another transaction attempt (so, on
// retryable errors, it acts like Send()).
// that cannot make progress, it returns an error. If the transaction aborted
// the error returned will be a retryable one; as such, the caller is
// responsible for handling the error before another attempt by calling
// PrepareForRetry. Use of the transaction before doing so will continue to be
// rejected.
func (txn *Txn) GetLeafTxnInputStateOrRejectClient(
ctx context.Context,
) (*roachpb.LeafTxnInputState, error) {
Expand All @@ -1267,10 +1262,6 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient(
defer txn.mu.Unlock()
tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending)
if err != nil {
var retryErr *kvpb.TransactionRetryWithProtoRefreshError
if errors.As(err, &retryErr) {
txn.handleRetryableErrLocked(ctx, retryErr)
}
return nil, err
}
return tfs, nil
Expand Down Expand Up @@ -1339,8 +1330,6 @@ func (txn *Txn) UpdateStateOnRemoteRetryableErr(ctx context.Context, pErr *kvpb.
}

pErr = txn.mu.sender.UpdateStateOnRemoteRetryableErr(ctx, pErr)
txn.replaceRootSenderIfTxnAbortedLocked(ctx, pErr.GetDetail().(*kvpb.TransactionRetryWithProtoRefreshError), origTxnID)

return pErr.GoError()
}

Expand All @@ -1350,6 +1339,11 @@ func (txn *Txn) UpdateStateOnRemoteRetryableErr(ctx context.Context, pErr *kvpb.
//
// origTxnID is the id of the txn that generated retryErr. Note that this can be
// different from retryErr.Transaction - the latter might be a new transaction.
//
// TODO(arul): Now that we only expect this to happen on the PrepareForRetry
// path, by design, should we just inline this function? Some of the handling
// of non-aborted transactions in this function feels a bit out of place with
// the new code structure.
func (txn *Txn) replaceRootSenderIfTxnAbortedLocked(
ctx context.Context, retryErr *kvpb.TransactionRetryWithProtoRefreshError, origTxnID uuid.UUID,
) {
Expand Down
69 changes: 69 additions & 0 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,72 @@ func TestGenerateForcedRetryableErrorByPoisoning(t *testing.T) {
checkKey(t, "a", 1)
checkKey(t, "b", 2)
}

// TestUpdateStateOnRemoteRetryableErr ensures transaction state is updated and
// a TransactionRetryWithProtoRefreshError is correctly constructed by
// UpdateStateOnRemoteRetryableError.
func TestUpdateStateOnRemoteRetryableErr(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

testCases := []struct {
err *kvpb.Error
epochBumped bool // if we expect the epoch to be bumped
newTxn bool // if we expect a new transaction in the returned error; implies to an ABORT
}{
{
err: kvpb.NewError(&kvpb.ReadWithinUncertaintyIntervalError{}),
epochBumped: true,
newTxn: false,
},
{
err: kvpb.NewError(&kvpb.TransactionAbortedError{}),
epochBumped: false,
newTxn: true,
},
{
err: kvpb.NewError(&kvpb.TransactionPushError{}),
epochBumped: true,
newTxn: false,
},
{
err: kvpb.NewError(&kvpb.TransactionRetryError{}),
epochBumped: true,
newTxn: false,
},
{
err: kvpb.NewError(&kvpb.WriteTooOldError{}),
epochBumped: true,
newTxn: false,
},
}

for _, tc := range testCases {
txn := db.NewTxn(ctx, "test")
pErr := tc.err
pErr.SetTxn(txn.Sender().TestingCloneTxn())
epochBefore := txn.Epoch()
txnIDBefore := txn.ID()
err := txn.UpdateStateOnRemoteRetryableErr(ctx, pErr)
// Ensure what we got back is a TransactionRetryWithProtoRefreshError.
require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err)
// Ensure the same thing is stored on the TxnCoordSender as well.
retErr := txn.Sender().GetTxnRetryableErr(ctx)
require.Equal(t, retErr, err)
if tc.epochBumped {
require.Greater(t, txn.Epoch(), epochBefore)
require.Equal(t, retErr.TxnID, txnIDBefore) // transaction IDs should not have changed on us
}
if tc.newTxn {
require.NotEqual(t, retErr.Transaction.ID, txnIDBefore)
require.Equal(t, txn.Sender().TxnStatus(), roachpb.ABORTED)
}
// Lastly, ensure the TxnCoordSender was not swapped out, even if the
// transaction was aborted.
require.Equal(t, txn.Sender().TestingCloneTxn().ID, txnIDBefore)
}
}
11 changes: 10 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ func (dsp *DistSQLPlanner) setupFlows(
}

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"
const executingParallelAndSerialChecks = "executing %d checks concurrently and %d checks serially"

// Run executes a physical plan. The plan should have been finalized using
// FinalizePlan.
Expand Down Expand Up @@ -1582,6 +1583,12 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
return recv.commErr
}

if knobs := evalCtx.ExecCfg.DistSQLRunTestingKnobs; knobs != nil {
if fn := knobs.RunBeforeCascadesAndChecks; fn != nil {
fn(planner.Txn().ID())
}
}

dsp.PlanAndRunCascadesAndChecks(
ctx, planner, evalCtxFactory, &planner.curPlan.planComponents, recv,
)
Expand Down Expand Up @@ -2212,7 +2219,9 @@ func (dsp *DistSQLPlanner) planAndRunChecksInParallel(
numParallelChecks--
}

log.VEventf(ctx, 2, "executing %d checks concurrently and %d checks serially", numParallelChecks, len(checkPlans)-numParallelChecks)
log.VEventf(
ctx, 2, executingParallelAndSerialChecks, numParallelChecks, len(checkPlans)-numParallelChecks,
)

// Set up a wait group so that the main (current) goroutine can block until
// all concurrent checks return. We cannot short-circuit if one of the
Expand Down
Loading