From 0461b0069fcfd9eeb055d1023c4c08874a9b2f8f Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Wed, 15 Mar 2023 16:14:31 -0400 Subject: [PATCH] sql,kv: bubble up retry errors when creating leaf transactions Previously, if we detected that the transaction was aborted when trying to construct leaf transaction state, we would handle the retry error instead of bubbling it up to the caller. When a transaction is aborted, the `TransactionRetryWithProtoRefreshError` carries with it a new transaction that should be used for subsequent attempts. Handling the retry error entailed swapping out the old `TxnCoordSender` with a new one -- one that is associated with this new transaction. This is bug prone when trying to create multiple leaf transactions in parallel if the root has been aborted. We would expect the first leaf transaction to handle the error and all subsequent leaf transactions to point to the new transaction, as the `TxnCoordSender` has been swapped out. This wasn't an issue before as we never really created multiple leaf transactions in parallel. This recently change in 0f4b431af0ea0a13643b5fe8c6a172bf37a73a98, which started parallelizing FK and uniqueness checks. With this change, we could see FK or uniqueness violations when in fact the transaction needed to be retried. This patch fixes the issue described above by not handling the retry error when creating leaf transactions. Instead, we expect the ConnExecutor to retry the entire transaction and prepare it for another iteration. Fixes #97141 Epic: none Release note: None --- pkg/kv/kvpb/data.go | 7 ++ pkg/kv/txn.go | 30 ++--- pkg/kv/txn_external_test.go | 69 ++++++++++ pkg/sql/distsql_running.go | 11 +- pkg/sql/distsql_running_test.go | 196 +++++++++++++++++++++++++++++ pkg/sql/execinfra/server_config.go | 6 + 6 files changed, 300 insertions(+), 19 deletions(-) diff --git a/pkg/kv/kvpb/data.go b/pkg/kv/kvpb/data.go index 940cccf5c077..e1f2964b06fa 100644 --- a/pkg/kv/kvpb/data.go +++ b/pkg/kv/kvpb/data.go @@ -102,6 +102,13 @@ func PrepareTransactionForRetry( case *WriteTooOldError: // Increase the timestamp to the ts at which we've actually written. txn.WriteTimestamp.Forward(tErr.RetryTimestamp()) + case *IntentMissingError: + // IntentMissingErrors are not expected to be handled at this level; + // We instead expect the txnPipeliner to transform them into a + // TransactionRetryErrors(RETRY_ASYNC_WRITE_FAILURE) error. + log.Fatalf( + ctx, "unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail(), + ) default: log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr) } diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 7f0914a5bc33..d5e6a1355b9c 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -993,7 +993,8 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) { } log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s", txn.debugNameLocked(), retryErr) - txn.handleRetryableErrLocked(ctx, retryErr) + txn.resetDeadlineLocked() + txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1073,13 +1074,6 @@ func (txn *Txn) Send( return br, pErr } -func (txn *Txn) handleRetryableErrLocked( - ctx context.Context, retryErr *kvpb.TransactionRetryWithProtoRefreshError, -) { - txn.resetDeadlineLocked() - txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) -} - // NegotiateAndSend is a specialized version of Send that is capable of // orchestrating a bounded-staleness read through the transaction, given a // read-only BatchRequest with a min_timestamp_bound set in its Header. @@ -1251,10 +1245,11 @@ func (txn *Txn) GetLeafTxnInputState(ctx context.Context) *roachpb.LeafTxnInputS // GetLeafTxnInputStateOrRejectClient is like GetLeafTxnInputState // except, if the transaction is already aborted or otherwise in state -// that cannot make progress, it returns an error. If the transaction -// is aborted, the error will be a retryable one, and the transaction -// will have been prepared for another transaction attempt (so, on -// retryable errors, it acts like Send()). +// that cannot make progress, it returns an error. If the transaction aborted +// the error returned will be a retryable one; as such, the caller is +// responsible for handling the error before another attempt by calling +// PrepareForRetry. Use of the transaction before doing so will continue to be +// rejected. func (txn *Txn) GetLeafTxnInputStateOrRejectClient( ctx context.Context, ) (*roachpb.LeafTxnInputState, error) { @@ -1267,10 +1262,6 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient( defer txn.mu.Unlock() tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending) if err != nil { - var retryErr *kvpb.TransactionRetryWithProtoRefreshError - if errors.As(err, &retryErr) { - txn.handleRetryableErrLocked(ctx, retryErr) - } return nil, err } return tfs, nil @@ -1339,8 +1330,6 @@ func (txn *Txn) UpdateStateOnRemoteRetryableErr(ctx context.Context, pErr *kvpb. } pErr = txn.mu.sender.UpdateStateOnRemoteRetryableErr(ctx, pErr) - txn.replaceRootSenderIfTxnAbortedLocked(ctx, pErr.GetDetail().(*kvpb.TransactionRetryWithProtoRefreshError), origTxnID) - return pErr.GoError() } @@ -1350,6 +1339,11 @@ func (txn *Txn) UpdateStateOnRemoteRetryableErr(ctx context.Context, pErr *kvpb. // // origTxnID is the id of the txn that generated retryErr. Note that this can be // different from retryErr.Transaction - the latter might be a new transaction. +// +// TODO(arul): Now that we only expect this to happen on the PrepareForRetry +// path, by design, should we just inline this function? Some of the handling +// of non-aborted transactions in this function feels a bit out of place with +// the new code structure. func (txn *Txn) replaceRootSenderIfTxnAbortedLocked( ctx context.Context, retryErr *kvpb.TransactionRetryWithProtoRefreshError, origTxnID uuid.UUID, ) { diff --git a/pkg/kv/txn_external_test.go b/pkg/kv/txn_external_test.go index fbfeccf4ccf0..659d64403ee2 100644 --- a/pkg/kv/txn_external_test.go +++ b/pkg/kv/txn_external_test.go @@ -703,3 +703,72 @@ func TestGenerateForcedRetryableErrorByPoisoning(t *testing.T) { checkKey(t, "a", 1) checkKey(t, "b", 2) } + +// TestUpdateStateOnRemoteRetryableErr ensures transaction state is updated and +// a TransactionRetryWithProtoRefreshError is correctly constructed by +// UpdateStateOnRemoteRetryableError. +func TestUpdateStateOnRemoteRetryableErr(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + testCases := []struct { + err *kvpb.Error + epochBumped bool // if we expect the epoch to be bumped + newTxn bool // if we expect a new transaction in the returned error; implies to an ABORT + }{ + { + err: kvpb.NewError(&kvpb.ReadWithinUncertaintyIntervalError{}), + epochBumped: true, + newTxn: false, + }, + { + err: kvpb.NewError(&kvpb.TransactionAbortedError{}), + epochBumped: false, + newTxn: true, + }, + { + err: kvpb.NewError(&kvpb.TransactionPushError{}), + epochBumped: true, + newTxn: false, + }, + { + err: kvpb.NewError(&kvpb.TransactionRetryError{}), + epochBumped: true, + newTxn: false, + }, + { + err: kvpb.NewError(&kvpb.WriteTooOldError{}), + epochBumped: true, + newTxn: false, + }, + } + + for _, tc := range testCases { + txn := db.NewTxn(ctx, "test") + pErr := tc.err + pErr.SetTxn(txn.Sender().TestingCloneTxn()) + epochBefore := txn.Epoch() + txnIDBefore := txn.ID() + err := txn.UpdateStateOnRemoteRetryableErr(ctx, pErr) + // Ensure what we got back is a TransactionRetryWithProtoRefreshError. + require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err) + // Ensure the same thing is stored on the TxnCoordSender as well. + retErr := txn.Sender().GetTxnRetryableErr(ctx) + require.Equal(t, retErr, err) + if tc.epochBumped { + require.Greater(t, txn.Epoch(), epochBefore) + require.Equal(t, retErr.TxnID, txnIDBefore) // transaction IDs should not have changed on us + } + if tc.newTxn { + require.NotEqual(t, retErr.Transaction.ID, txnIDBefore) + require.Equal(t, txn.Sender().TxnStatus(), roachpb.ABORTED) + } + // Lastly, ensure the TxnCoordSender was not swapped out, even if the + // transaction was aborted. + require.Equal(t, txn.Sender().TestingCloneTxn().ID, txnIDBefore) + } +} diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index eb07b2b32e45..8b66e1d3b41d 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -640,6 +640,7 @@ func (dsp *DistSQLPlanner) setupFlows( } const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan" +const executingParallelAndSerialChecks = "executing %d checks concurrently and %d checks serially" // Run executes a physical plan. The plan should have been finalized using // FinalizePlan. @@ -1582,6 +1583,12 @@ func (dsp *DistSQLPlanner) PlanAndRunAll( return recv.commErr } + if knobs := evalCtx.ExecCfg.DistSQLRunTestingKnobs; knobs != nil { + if fn := knobs.RunBeforeCascadesAndChecks; fn != nil { + fn(planner.Txn().ID()) + } + } + dsp.PlanAndRunCascadesAndChecks( ctx, planner, evalCtxFactory, &planner.curPlan.planComponents, recv, ) @@ -2212,7 +2219,9 @@ func (dsp *DistSQLPlanner) planAndRunChecksInParallel( numParallelChecks-- } - log.VEventf(ctx, 2, "executing %d checks concurrently and %d checks serially", numParallelChecks, len(checkPlans)-numParallelChecks) + log.VEventf( + ctx, 2, executingParallelAndSerialChecks, numParallelChecks, len(checkPlans)-numParallelChecks, + ) // Set up a wait group so that the main (current) goroutine can block until // all concurrent checks return. We cannot short-circuit if one of the diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index de4b2400b039..edc76cee5cf9 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/datadriven" @@ -197,6 +198,201 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { } } +// TestDistSQLRunningParallelFKChecksAfterAbort simulates a SQL transaction +// that writes two rows required to validate a FK check and then proceeds to +// write a third row that would actually trigger this check. The transaction is +// aborted after the third row is written but before the FK check is performed. +// We assert that this construction doesn't throw a FK violation; instead, the +// transaction should be able to retry. +// This test serves as a regression test for the hazard identified in +// https://github.com/cockroachdb/cockroach/issues/97141. +func TestDistSQLRunningParallelFKChecksAfterAbort(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + mu := struct { + syncutil.Mutex + abortTxn func(uuid uuid.UUID) + }{} + + s, conn, db := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + RunBeforeCascadesAndChecks: func(txnID uuid.UUID) { + mu.Lock() + defer mu.Unlock() + if mu.abortTxn != nil { + mu.abortTxn(txnID) + } + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(conn) + + // Set up schemas for the test. We want a construction that results in 2 FK + // checks, of which 1 is done in parallel. + sqlDB.Exec(t, "create database test") + sqlDB.Exec(t, "create table test.parent1(a INT PRIMARY KEY)") + sqlDB.Exec(t, "create table test.parent2(b INT PRIMARY KEY)") + sqlDB.Exec( + t, + "create table test.child(a INT, b INT, FOREIGN KEY (a) REFERENCES test.parent1(a), FOREIGN KEY (b) REFERENCES test.parent2(b))", + ) + key := roachpb.Key("a") + + setupQueries := []string{ + "insert into test.parent1 VALUES(1)", + "insert into test.parent2 VALUES(2)", + } + query := "insert into test.child VALUES(1, 2)" + + createPlannerAndRunQuery := func(ctx context.Context, txn *kv.Txn, query string) error { + execCfg := s.ExecutorConfig().(ExecutorConfig) + // Plan the statement. + internalPlanner, cleanup := NewInternalPlanner( + "test", + txn, + username.RootUserName(), + &MemoryMetrics{}, + &execCfg, + sessiondatapb.SessionData{}, + ) + defer cleanup() + p := internalPlanner.(*planner) + stmt, err := parser.ParseOne(query) + require.NoError(t, err) + + rw := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { + return nil + }) + recv := MakeDistSQLReceiver( + ctx, + rw, + stmt.AST.StatementReturnType(), + execCfg.RangeDescriptorCache, + txn, + execCfg.Clock, + p.ExtendedEvalContext().Tracing, + ) + + p.stmt = makeStatement(stmt, clusterunique.ID{}) + if err := p.makeOptimizerPlan(ctx); err != nil { + t.Fatal(err) + } + defer p.curPlan.close(ctx) + + evalCtx := p.ExtendedEvalContext() + planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, txn, DistributionTypeNone) + planCtx.stmtType = recv.stmtType + + evalCtxFactory := func(bool) *extendedEvalContext { + factoryEvalCtx := extendedEvalContext{Tracing: evalCtx.Tracing} + factoryEvalCtx.Context = evalCtx.Context + return &factoryEvalCtx + } + err = execCfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, p, recv, evalCtxFactory) + if err != nil { + return err + } + return rw.Err() + } + + push := func(ctx context.Context, key roachpb.Key) error { + // Conflicting transaction that pushes another transaction. + conflictTxn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) + // We need to explicitly set a high priority for the push to happen. + if err := conflictTxn.SetUserPriority(roachpb.MaxUserPriority); err != nil { + return err + } + // Push through a Put, as opposed to a Get, so that the pushee gets aborted. + if err := conflictTxn.Put(ctx, key, "pusher was here"); err != nil { + return err + } + err := conflictTxn.Commit(ctx) + require.NoError(t, err) + t.Log(conflictTxn.Rollback(ctx)) + return err + } + + // Make a db with a short heartbeat interval, so that the aborted txn finds + // out quickly. + ambient := s.AmbientCtx() + tsf := kvcoord.NewTxnCoordSenderFactory( + kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + // Short heartbeat interval. + HeartbeatInterval: time.Millisecond, + Settings: s.ClusterSettings(), + Clock: s.Clock(), + Stopper: s.Stopper(), + }, + s.DistSenderI().(*kvcoord.DistSender), + ) + shortDB := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper()) + + iter := 0 + // We'll trace to make sure the test isn't fooling itself. + tr := s.TracerI().(*tracing.Tracer) + runningCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test") + defer getRecAndFinish() + err := shortDB.Txn(runningCtx, func(ctx context.Context, txn *kv.Txn) error { + iter++ + + // set up the test. + for _, query := range setupQueries { + err := createPlannerAndRunQuery(ctx, txn, query) + require.NoError(t, err) + } + + if iter == 1 { + // On the first iteration, abort the txn by setting the abortTxn function. + mu.Lock() + mu.abortTxn = func(txnID uuid.UUID) { + if txnID != txn.ID() { + return // not our txn + } + if err := txn.Put(ctx, key, "val"); err != nil { + t.Fatal(err) + } + if err := push(ctx, key); err != nil { + t.Fatal(err) + } + // Now wait until the heartbeat loop notices that the transaction is aborted. + testutils.SucceedsSoon(t, func() error { + if txn.Sender().(*kvcoord.TxnCoordSender).IsTracking() { + return fmt.Errorf("txn heartbeat loop running") + } + return nil + }) + } + mu.Unlock() + defer func() { + // clear the abortTxn function before returning. + mu.Lock() + mu.abortTxn = nil + mu.Unlock() + }() + } + + // Execute the FK checks. + return createPlannerAndRunQuery(ctx, txn, query) + }) + if err != nil { + t.Fatal(err) + } + require.Equal(t, iter, 2) + if tracing.FindMsgInRecording(getRecAndFinish(), clientRejectedMsg) == -1 { + t.Fatalf("didn't find expected message in trace: %s", clientRejectedMsg) + } + concurrentFKChecksLogMessage := fmt.Sprintf(executingParallelAndSerialChecks, 1, 1) + if tracing.FindMsgInRecording(getRecAndFinish(), concurrentFKChecksLogMessage) == -1 { + t.Fatalf("didn't find expected message in trace: %s", concurrentFKChecksLogMessage) + } +} + // Test that the DistSQLReceiver overwrites previous errors as "better" errors // come along. func TestDistSQLReceiverErrorRanking(t *testing.T) { diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 82267d185c6d..ae1975c9cd9e 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/marusama/semaphore" ) @@ -298,6 +299,11 @@ type TestingKnobs struct { // when responding to SetupFlow RPCs, after the flow is set up but before it // is started. SetupFlowCb func(context.Context, base.SQLInstanceID, *execinfrapb.SetupFlowRequest) error + + // RunBeforeCascadeAndChecks is run before any cascade or check queries are + // run. The associated transaction ID of the statement performing the cascade + // or check query is passed in as an argument. + RunBeforeCascadesAndChecks func(txnID uuid.UUID) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.