diff --git a/pkg/cli/sql.go b/pkg/cli/sql.go index 137afa1a5836..296f0ae5edd6 100644 --- a/pkg/cli/sql.go +++ b/pkg/cli/sql.go @@ -427,7 +427,7 @@ func (c *cliState) refreshTransactionStatus() (txnStatus string) { txnStatus = " DONE" case sql.RestartWait.String(): txnStatus = " RETRY" - case sql.Open.String(): + case sql.Open.String(), sql.FirstBatch.String(): txnStatus = " OPEN" } diff --git a/pkg/sql/executor.go b/pkg/sql/executor.go index 711bc232641f..6acb9ee3322f 100644 --- a/pkg/sql/executor.go +++ b/pkg/sql/executor.go @@ -687,7 +687,8 @@ func (e *Executor) execParsed( } for len(stmts) > 0 { - // Each iteration consumes a transaction's worth of statements. + // Each iteration consumes a transaction's worth of statements. Any error + // that is encountered resets stmts. inTxn := txnState.State() != NoTxn execOpt := client.TxnExecOptions{ @@ -704,7 +705,7 @@ func (e *Executor) execParsed( // (i.e. the next statements we're going to see are the first statements in // a transaction). if !inTxn { - // Detect implicit transactions. + // Detect implicit transactions - they need to be autocommitted. if _, isBegin := stmts[0].AST.(*parser.BeginTransaction); !isBegin { execOpt.AutoCommit = true stmtsToExec = stmtsToExec[:1] @@ -733,7 +734,8 @@ func (e *Executor) execParsed( txnState.mu.txn.SetDebugName(sqlTxnName) } } else { - txnState.autoRetry = false + // If we are in a txn, the first batch get auto-retried. + txnState.autoRetry = txnState.State() == FirstBatch } execOpt.AutoRetry = txnState.autoRetry if txnState.State() == NoTxn { @@ -748,7 +750,7 @@ func (e *Executor) execParsed( automaticRetryCount := 0 txnClosure := func(ctx context.Context, txn *client.Txn, opt *client.TxnExecOptions) error { defer func() { automaticRetryCount++ }() - if txnState.State() == Open && txnState.mu.txn != txn { + if txnState.TxnIsOpen() && txnState.mu.txn != txn { panic(fmt.Sprintf("closure wasn't called in the txn we set up for it."+ "\ntxnState.mu.txn:%+v\ntxn:%+v\ntxnState:%+v", txnState.mu.txn, txn, txnState)) } @@ -768,7 +770,7 @@ func (e *Executor) execParsed( results, remainingStmts, err = runTxnAttempt( e, session, stmtsToExec, pinfo, origState, opt, - avoidCachedDescriptors, automaticRetryCount) + !inTxn /* txnPrefix */, avoidCachedDescriptors, automaticRetryCount) // TODO(andrei): Until #7881 fixed. if err == nil && txnState.State() == Aborted { @@ -877,10 +879,10 @@ func (e *Executor) execParsed( } } - // If the txn is in any state but Open, exec the schema changes. They'll - // short-circuit themselves if the mutation that queued them has been - // rolled back from the table descriptor. - if txnState.State() != Open { + // If the txn is not in an "open" state any more, exec the schema changes. + // They'll short-circuit themselves if the mutation that queued them has + // been rolled back from the table descriptor. + if !txnState.TxnIsOpen() { // Verify that metadata callback eventually succeeds, if one was // set. if e.cfg.TestingKnobs.WaitForGossipUpdate { @@ -936,6 +938,9 @@ func countRowsAffected(ctx context.Context, p planNode) (int, error) { // runTxnAttempt is used in the closure we pass to txn.Exec(). It // will be called possibly multiple times (if opt.AutoRetry is set). +// +// txnPrefix: set if the statements represent the first batch of statements in a +// txn. Used to trap nested BEGINs. func runTxnAttempt( e *Executor, session *Session, @@ -943,6 +948,7 @@ func runTxnAttempt( pinfo *parser.PlaceholderInfo, origState TxnStateEnum, opt *client.TxnExecOptions, + txnPrefix bool, avoidCachedDescriptors bool, automaticRetryCount int, ) ([]Result, StatementList, error) { @@ -959,7 +965,7 @@ func runTxnAttempt( results, remainingStmts, err := e.execStmtsInCurrentTxn( session, stmts, pinfo, opt.AutoCommit, /* implicitTxn */ - opt.AutoRetry /* txnBeginning */, avoidCachedDescriptors, automaticRetryCount) + txnPrefix, avoidCachedDescriptors, automaticRetryCount) if opt.AutoCommit && len(remainingStmts) > 0 { panic("implicit txn failed to execute all stmts") @@ -992,6 +998,8 @@ func runTxnAttempt( // a transaction). // COMMIT/ROLLBACK statements are rejected if set. Also, the transaction // might be auto-committed in this function. +// txnPrefix: set if the statements represent the first batch of statements in a +// txn. Used to trap nested BEGINs. // avoidCachedDescriptors: set if the statement execution should avoid // using cached descriptors. // automaticRetryCount: increases with each retry; 0 for the first attempt. @@ -1010,7 +1018,7 @@ func (e *Executor) execStmtsInCurrentTxn( stmts StatementList, pinfo *parser.PlaceholderInfo, implicitTxn bool, - txnBeginning bool, + txnPrefix bool, avoidCachedDescriptors bool, automaticRetryCount int, ) ([]Result, StatementList, error) { @@ -1051,9 +1059,9 @@ func (e *Executor) execStmtsInCurrentTxn( res, err = runShowTransactionState(session, implicitTxn) } else { switch txnState.State() { - case Open: + case Open, FirstBatch: res, err = e.execStmtInOpenTxn( - session, stmt, pinfo, implicitTxn, txnBeginning && (i == 0), /* firstInTxn */ + session, stmt, pinfo, implicitTxn, txnPrefix && (i == 0), /* firstInTxn */ avoidCachedDescriptors, automaticRetryCount) case Aborted, RestartWait: res, err = e.execStmtInAbortedTxn(session, stmt) @@ -1096,6 +1104,11 @@ func getTransactionState(txnState *txnState, implicitTxn bool) string { if implicitTxn { state = NoTxn } + // For the purposes of representing the states to client, make the FirstBatch + // state look like Open. + if state == FirstBatch { + state = Open + } return state.String() } @@ -1157,8 +1170,8 @@ func (e *Executor) execStmtInAbortedTxn(session *Session, stmt Statement) (Resul return Result{}, err } if txnState.State() == RestartWait { - // Reset the state. Txn is Open again. - txnState.SetState(Open) + // Reset the state to FirstBatch. We're in an "open" txn again. + txnState.SetState(FirstBatch) // TODO(andrei/cdo): add a counter for user-directed retries. return Result{}, nil } @@ -1222,7 +1235,8 @@ func sessionEventf(session *Session, format string, args ...interface{}) { // It binds placeholders. // // The current transaction might be committed/rolled back when this returns. -// It might also have transitioned to the aborted or RestartWait state. +// It might also transition to the aborted or RestartWait state, and it +// might transition from FirstBatch to Open. // // Args: // session: the session to execute the statement in. @@ -1252,7 +1266,7 @@ func (e *Executor) execStmtInOpenTxn( automaticRetryCount int, ) (_ Result, err error) { txnState := &session.TxnState - if txnState.State() != Open { + if !txnState.TxnIsOpen() { panic("execStmtInOpenTxn called outside of an open txn") } @@ -1263,12 +1277,18 @@ func (e *Executor) execStmtInOpenTxn( e.updateStmtCounts(stmt) } + // After the statement is executed, we might have to do state transitions. defer func() { if err != nil { - if txnState.State() != Open { + if !txnState.TxnIsOpen() { panic(fmt.Sprintf("unexpected txnState when cleaning up: %v", txnState.State())) } txnState.updateStateAndCleanupOnErr(err, e) + } else if txnState.State() == FirstBatch && + !canStayInFirstBatchState(stmt) { + // Transition from FirstBatch to Open except in the case of special + // statements that don't return results to the client. + txnState.SetState(Open) } }() @@ -1439,7 +1459,7 @@ func stmtAllowedInImplicitTxn(stmt Statement) bool { // rollbackSQLTransaction rolls back a transaction. All errors are swallowed. func rollbackSQLTransaction(txnState *txnState) Result { - if txnState.State() != Open && txnState.State() != RestartWait { + if !txnState.TxnIsOpen() && txnState.State() != RestartWait { panic(fmt.Sprintf("rollbackSQLTransaction called on txn in wrong state: %s (txn: %s)", txnState.State(), txnState.mu.txn.Proto())) } @@ -1463,7 +1483,7 @@ const ( // commitSqlTransaction commits a transaction. func commitSQLTransaction(txnState *txnState, commitType commitType) (Result, error) { - if txnState.State() != Open { + if !txnState.TxnIsOpen() { panic(fmt.Sprintf("commitSqlTransaction called on non-open txn: %+v", txnState.mu.txn)) } if commitType == commit { @@ -1964,6 +1984,42 @@ func isAsOf(session *Session, stmt parser.Statement, max hlc.Timestamp) (*hlc.Ti return &ts, err } +// isSavepoint returns true if stmt is a SAVEPOINT statement. +func isSavepoint(stmt Statement) bool { + _, isSavepoint := stmt.AST.(*parser.Savepoint) + return isSavepoint +} + +// isBegin returns true if stmt is a BEGIN statement. +func isBegin(stmt Statement) bool { + _, isBegin := stmt.AST.(*parser.BeginTransaction) + return isBegin +} + +// isSetTransaction returns true if stmt is a "SET TRANSACTION ..." statement. +func isSetTransaction(stmt Statement) bool { + _, isSet := stmt.AST.(*parser.SetTransaction) + return isSet +} + +// isRollbackToSavepoint returns true if stmt is a "ROLLBACK TO SAVEPOINT" +// statement. +func isRollbackToSavepoint(stmt Statement) bool { + _, isSet := stmt.AST.(*parser.RollbackToSavepoint) + return isSet +} + +// canStayInFirstBatchState returns true if the statement can leave the +// transaction in the FirstBatch state (as opposed to transitioning it to Open). +func canStayInFirstBatchState(stmt Statement) bool { + return isBegin(stmt) || + isSavepoint(stmt) || + isSetTransaction(stmt) || + // ROLLBACK TO SAVEPOINT does its own state transitions; if it leave the + // transaction in the FirstBatch state, don't mess with it. + isRollbackToSavepoint(stmt) +} + // convertToErrWithPGCode recognizes errs that should have SQL error codes to be // reported to the client and converts err to them. If this doesn't apply, err // is returned. diff --git a/pkg/sql/logictest/testdata/logic_test/manual_retry b/pkg/sql/logictest/testdata/logic_test/manual_retry index 2bf1e95122c0..e6afb1fc4824 100644 --- a/pkg/sql/logictest/testdata/logic_test/manual_retry +++ b/pkg/sql/logictest/testdata/logic_test/manual_retry @@ -10,6 +10,11 @@ SELECT CRDB_INTERNAL.FORCE_RETRY('50ms':::INTERVAL) statement ok BEGIN TRANSACTION; SAVEPOINT cockroach_restart +# The SELECT 1 is necessary to take the session out of the FirstBatch state, +# otherwise the statement below would be retries automatically. +statement ok +SELECT 1 + query error restart transaction: HandledRetryableTxnError: forced by crdb_internal.force_retry() SELECT CRDB_INTERNAL.FORCE_RETRY('500ms':::INTERVAL) diff --git a/pkg/sql/logictest/testdata/logic_test/txn b/pkg/sql/logictest/testdata/logic_test/txn index 6092c8f4030a..80158eea6fc6 100644 --- a/pkg/sql/logictest/testdata/logic_test/txn +++ b/pkg/sql/logictest/testdata/logic_test/txn @@ -139,6 +139,17 @@ SELECT * FROM kv a c x y +# Two BEGINs in a row. + +statement ok +BEGIN TRANSACTION + +statement error there is already a transaction in progress +BEGIN TRANSACTION + +statement ok +ROLLBACK TRANSACTION + # BEGIN in the middle of a transaction is an error. statement ok @@ -527,8 +538,10 @@ statement ok COMMIT # RestartWait state +# The SELECT 1 is necessary to move the txn out of the FirstBatch state, +# otherwise the next statement is automatically retried on the server. statement ok -BEGIN TRANSACTION; SAVEPOINT cockroach_restart +BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SELECT 1 query error pgcode 40001 restart transaction: HandledRetryableTxnError: forced by crdb_internal.force_retry() SELECT CRDB_INTERNAL.FORCE_RETRY('1s':::INTERVAL) @@ -549,6 +562,68 @@ Open statement ok COMMIT +# Automatic retries for the first batch. +statement ok +BEGIN TRANSACTION; SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL) + +statement ok +ROLLBACK + +# Automatic retries for the first batch even when that first batch comes after +# the BEGIN. +statement ok +BEGIN TRANSACTION; + +statement ok +SELECT 1; SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL) + +statement ok +ROLLBACK + +# Automatic retries for the first batch even when that first batch comes after +# the BEGIN and the BEGIN also has special statements that don't move the txn +# state out of the "FirstBatch" state. +statement ok +BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SET TRANSACTION PRIORITY NORMAL + +statement ok +SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL) + +statement ok +ROLLBACK + +# Like above, but the SAVEPOINT is its own batch. +statement ok +BEGIN TRANSACTION + +statement ok +SAVEPOINT cockroach_restart; + +statement ok +SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL) + +statement ok +ROLLBACK + + +# Automatic retries for the first batch after an explicit restart. +statement ok +BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SELECT 1; + +query error pgcode 40001 restart transaction: HandledRetryableTxnError: forced by crdb_internal.force_retry() +SELECT CRDB_INTERNAL.FORCE_RETRY('50ms':::INTERVAL) + +statement ok +ROLLBACK TO SAVEPOINT COCKROACH_RESTART; + +# This is the automatic retry we care about. +statement ok +SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL) + +statement ok +ROLLBACK + + # General savepoints statement ok BEGIN TRANSACTION @@ -651,8 +726,10 @@ BEGIN ISOLATION LEVEL SERIALIZABLE, ISOLATION LEVEL SERIALIZABLE # Retryable error in a txn that hasn't performed any KV operations. It used to # not work. +# The SELECT 1 is necessary to take the session out of the FirstBatch state, +# otherwise the statement below would be retries automatically. statement ok -BEGIN +BEGIN; SELECT 1 query error pgcode 40001 restart transaction: HandledRetryableTxnError: forced by crdb_internal.force_retry() SELECT CRDB_INTERNAL.FORCE_RETRY('1s':::INTERVAL) diff --git a/pkg/sql/metric_test.go b/pkg/sql/metric_test.go index 6ceb9153f326..8f84589ae529 100644 --- a/pkg/sql/metric_test.go +++ b/pkg/sql/metric_test.go @@ -161,9 +161,15 @@ func TestAbortCountConflictingWrites(t *testing.T) { if err != nil { t.Fatal(err) } + // Run a batch of statements to move the txn out of the FirstBatch state, + // otherwise the INSERT below would be automatically retried. + if _, err := txn.Exec("SELECT 1"); err != nil { + t.Fatal(err) + } + _, err = txn.Exec("INSERT INTO db.t VALUES ('key', 'marker')") if !testutils.IsError(err, "aborted") { - t.Fatal(err) + t.Fatalf("expected aborted error, got: %v", err) } if err = txn.Rollback(); err != nil { diff --git a/pkg/sql/pgwire/v3.go b/pkg/sql/pgwire/v3.go index ab2f34ae86ec..98ed5491de2f 100644 --- a/pkg/sql/pgwire/v3.go +++ b/pkg/sql/pgwire/v3.go @@ -387,7 +387,7 @@ func (c *v3Conn) serve(ctx context.Context, draining func() bool, reserved mon.B // We send status "InFailedTransaction" also for state RestartWait // because GO's lib/pq freaks out if we invent a new status. txnStatus = 'E' - case sql.Open: + case sql.Open, sql.FirstBatch: txnStatus = 'T' case sql.NoTxn: // We're not in a txn (i.e. the last txn was committed). diff --git a/pkg/sql/session.go b/pkg/sql/session.go index bf3867e5d225..60bd2aa73c55 100644 --- a/pkg/sql/session.go +++ b/pkg/sql/session.go @@ -732,8 +732,27 @@ const ( // Executor opens implicit transactions before executing non-transactional // queries. NoTxn TxnStateEnum = iota + + // A txn is in scope, and we're currently executing statements from the first + // batch of statements using the transaction. If BEGIN was the last (or the + // only) statement in a batch, that batch doesn't count (the next batch will + // be considered the first one). The first batch of statements can be + // automatically retried in case of retryable errors since there's been no + // client logic relying on reads performed in the transaction. + // + // A BEGIN statement makes the transaction enter this state (from a previous + // NoTxn state). The end of the first batch of statements, if executed + // successfully, will move the state to Open. + // + // TODO(andrei): It'd be cool if exiting this state would be based not on + // batches sent by the client, but results being sent by the server to the + // client (i.e. the client can send 100 batches but, if we haven't sent it any + // results yet, we know that we can still retry them all). + FirstBatch + // A txn is in scope. Open + // The txn has encountered a (non-retriable) error. // Statements will be rejected until a COMMIT/ROLLBACK is seen. Aborted @@ -747,7 +766,7 @@ const ( // Some states mean that a client.Txn is open, others don't. func (s TxnStateEnum) kvTxnIsOpen() bool { - return s == Open || s == RestartWait + return s == Open || s == FirstBatch || s == RestartWait } // txnState contains state associated with an ongoing SQL txn. @@ -822,6 +841,12 @@ func (ts *txnState) SetState(val TxnStateEnum) { atomic.StoreInt64((*int64)(&ts.state), int64(val)) } +// TxnIsOpen returns true if we are presently inside a SQL txn, and the txn is +// not in an error state. +func (ts *txnState) TxnIsOpen() bool { + return ts.State() == Open || ts.State() == FirstBatch +} + // resetForNewSQLTxn (re)initializes the txnState for a new transaction. // It creates a new client.Txn and initializes it using the session defaults. // txnState.State will be set to Open. @@ -888,7 +913,7 @@ func (ts *txnState) resetForNewSQLTxn(e *Executor, s *Session, implicitTxn bool) ts.sp = sp ts.Ctx = ctx - ts.SetState(Open) + ts.SetState(FirstBatch) s.Tracing.onNewSQLTxn(ts.sp) ts.mon.Start(ctx, &s.mon, mon.BoundAccount{}) diff --git a/pkg/sql/txn.go b/pkg/sql/txn.go index 446a58feaeb2..c799d9306fd4 100644 --- a/pkg/sql/txn.go +++ b/pkg/sql/txn.go @@ -28,7 +28,14 @@ func (p *planner) BeginTransaction(n *parser.BeginTransaction) (planNode, error) if p.txn == nil { return nil, errors.Errorf("the server should have already created a transaction") } - return &emptyNode{}, p.setTransactionModes(n.Modes) + if err := p.setTransactionModes(n.Modes); err != nil { + return nil, err + } + + // Enter the FirstBatch state. + p.session.TxnState.SetState(FirstBatch) + + return &emptyNode{}, nil } // SetTransaction sets a transaction's isolation level diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index 8528e99db7c5..b0a1fd820d93 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -568,6 +568,12 @@ BEGIN; t.Fatal(err) } + // Run a batch of statements to move the txn out of the FirstBatch state, + // otherwise the INSERT below would be automatically retried. + if _, err := sqlDB.Exec("SELECT 1"); err != nil { + t.Fatal(err) + } + // Continue the txn in a new request, which is not retriable. _, err := sqlDB.Exec("INSERT INTO t.test(k, v, t) VALUES (4, 'hooly', cluster_logical_timestamp())") if !testutils.IsError( @@ -684,6 +690,12 @@ func runTestTxn( tx *gosql.Tx, sentinelInsert string, ) bool { + // Run a bogus statement to disable the automatic server retries of subsequent + // statements. + if _, err := tx.Exec("SELECT 1"); err != nil { + t.Fatal(err) + } + retriesNeeded := (magicVals.restartCounts["boulanger"] + magicVals.abortCounts["boulanger"]) > 0 if retriesNeeded { @@ -1045,6 +1057,12 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); if _, err := tx.Exec("SAVEPOINT cockroach_restart"); err != nil { t.Fatal(err) } + // Run a batch of statements to move the txn out of the FirstBatch state, + // otherwise the INSERT below would be automatically retried. + if _, err := tx.Exec("SELECT 1"); err != nil { + t.Fatal(err) + } + if _, err := tx.Exec(insertStmt); err != nil { t.Fatal(err) } @@ -1077,6 +1095,12 @@ func TestUnexpectedStatementInRestartWait(t *testing.T) { if _, err := tx.Exec("SAVEPOINT cockroach_restart"); err != nil { t.Fatal(err) } + // Run a batch of statements to move the txn out of the FirstBatch state, + // otherwise the SELECT below would be automatically retried. + if _, err := tx.Exec("SELECT 1"); err != nil { + t.Fatal(err) + } + if _, err := tx.Exec( "SELECT CRDB_INTERNAL.FORCE_RETRY('1s':::INTERVAL)"); !testutils.IsError( err, `forced by crdb_internal\.force_retry\(\)`) { @@ -1366,7 +1390,7 @@ func TestDistSQLRetryableError(t *testing.T) { db.SetMaxOpenConns(1) - if _, err := db.Exec("SET DISTSQL = ALWAYS"); err != nil { + if _, err := db.Exec("SET DISTSQL = ON"); err != nil { t.Fatal(err) } @@ -1386,6 +1410,20 @@ func TestDistSQLRetryableError(t *testing.T) { if err != nil { t.Fatal(err) } + // Run a batch of statements to move the txn out of the "FirstBatch" state. + if _, err := txn.Exec("SELECT 1"); err != nil { + t.Fatal(err) + } + + // Let's make sure that DISTSQL will actually be used. + row := txn.QueryRow("SELECT automatic FROM [EXPLAIN (DISTSQL) SELECT COUNT(1) FROM t]") + var automatic bool + if err := row.Scan(&automatic); err != nil { + t.Fatal(err) + } + if !automatic { + t.Fatal("DISTSQL not used for test's query") + } _, err = txn.Exec("SELECT COUNT(1) FROM t") if !restarted { diff --git a/pkg/sql/txnstateenum_string.go b/pkg/sql/txnstateenum_string.go index 07b8dbd9fc2c..dd332ab72daa 100644 --- a/pkg/sql/txnstateenum_string.go +++ b/pkg/sql/txnstateenum_string.go @@ -4,9 +4,9 @@ package sql import "fmt" -const _TxnStateEnum_name = "NoTxnOpenAbortedRestartWaitCommitWait" +const _TxnStateEnum_name = "NoTxnFirstBatchOpenAbortedRestartWaitCommitWait" -var _TxnStateEnum_index = [...]uint8{0, 5, 9, 16, 27, 37} +var _TxnStateEnum_index = [...]uint8{0, 5, 15, 19, 26, 37, 47} func (i TxnStateEnum) String() string { if i < 0 || i >= TxnStateEnum(len(_TxnStateEnum_index)-1) {