Skip to content

Commit

Permalink
sql: automatically retry the first batch after a BEGIN
Browse files Browse the repository at this point in the history
Before this patch, in case of retryable errors, the server (i.e. the
Executor) would automatically retry a batch of statements if the batch
was the prefix of a transaction (or if the batch contained the whole
txn). For example, if the following SELECT would get an error, it'd be
retried if all of the following statements arrived to the server in one
batch: BEGIN; ...; SELECT foo; [... COMMIT]. The rationale in retrying
these prefixes, but not otherwise, was that, in the case of a prefix
batch, we know that the client had no conditional logic based on reads
performed in the current txn.

This patch extends this reasoning to statements executed in the first
batch arriving after the batch with the BEGIN if the BEGIN had been
trailing a previous batch (more realistically, if the BEGIN is sent
alone as a batch). As a further optimization, the SAVEPOINT statement
doesn't change the retryable character of the next range). So, if you do
something like (different lines are different batches):
BEGIN
SELECT foo;

or

BEGIN; SAVEPOINT cockroach_restart;
SELECT foo

or

BEGIN
SAVEPOINT cockroach_restart
[...;] SELECT FOO

the SELECTs will be retried automatically.

Besides being generally a good idea to hide retryable errors more, this
change was motivated by ORMs getting retryable errors from a BEGIN;
CREATE TABLE ...; COMMIT; sequence (with the BEGIN being a separate
batch). This ORM code is not under our control and we can't teach it
about user-directed retries.

This is implemented by creating a new txnState.State - FirstBatch.
Auto-retry is enabled for batches executed in this state.

Fixes cockroachdb#16450
Fixes cockroachdb#16200
See also forum discussion about it:
https://forum.cockroachlabs.com/t/automatically-retrying-the-first-batch-of-statements-after-a-begin/759
  • Loading branch information
andreimatei committed Jul 3, 2017
1 parent 7a31a9e commit 488a1ec
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (c *cliState) refreshTransactionStatus() (txnStatus string) {
txnStatus = " DONE"
case sql.RestartWait.String():
txnStatus = " RETRY"
case sql.Open.String():
case sql.Open.String(), sql.FirstBatch.String():
txnStatus = " OPEN"
}

Expand Down
96 changes: 76 additions & 20 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,8 @@ func (e *Executor) execParsed(
}

for len(stmts) > 0 {
// Each iteration consumes a transaction's worth of statements.
// Each iteration consumes a transaction's worth of statements. Any error
// that is encountered resets stmts.

inTxn := txnState.State() != NoTxn
execOpt := client.TxnExecOptions{
Expand All @@ -704,7 +705,7 @@ func (e *Executor) execParsed(
// (i.e. the next statements we're going to see are the first statements in
// a transaction).
if !inTxn {
// Detect implicit transactions.
// Detect implicit transactions - they need to be autocommitted.
if _, isBegin := stmts[0].AST.(*parser.BeginTransaction); !isBegin {
execOpt.AutoCommit = true
stmtsToExec = stmtsToExec[:1]
Expand Down Expand Up @@ -733,7 +734,8 @@ func (e *Executor) execParsed(
txnState.mu.txn.SetDebugName(sqlTxnName)
}
} else {
txnState.autoRetry = false
// If we are in a txn, the first batch get auto-retried.
txnState.autoRetry = txnState.State() == FirstBatch
}
execOpt.AutoRetry = txnState.autoRetry
if txnState.State() == NoTxn {
Expand All @@ -748,7 +750,7 @@ func (e *Executor) execParsed(
automaticRetryCount := 0
txnClosure := func(ctx context.Context, txn *client.Txn, opt *client.TxnExecOptions) error {
defer func() { automaticRetryCount++ }()
if txnState.State() == Open && txnState.mu.txn != txn {
if txnState.TxnIsOpen() && txnState.mu.txn != txn {
panic(fmt.Sprintf("closure wasn't called in the txn we set up for it."+
"\ntxnState.mu.txn:%+v\ntxn:%+v\ntxnState:%+v", txnState.mu.txn, txn, txnState))
}
Expand All @@ -768,7 +770,7 @@ func (e *Executor) execParsed(

results, remainingStmts, err = runTxnAttempt(
e, session, stmtsToExec, pinfo, origState, opt,
avoidCachedDescriptors, automaticRetryCount)
!inTxn /* txnPrefix */, avoidCachedDescriptors, automaticRetryCount)

// TODO(andrei): Until #7881 fixed.
if err == nil && txnState.State() == Aborted {
Expand Down Expand Up @@ -877,10 +879,10 @@ func (e *Executor) execParsed(
}
}

// If the txn is in any state but Open, exec the schema changes. They'll
// short-circuit themselves if the mutation that queued them has been
// rolled back from the table descriptor.
if txnState.State() != Open {
// If the txn is not in an "open" state any more, exec the schema changes.
// They'll short-circuit themselves if the mutation that queued them has
// been rolled back from the table descriptor.
if !txnState.TxnIsOpen() {
// Verify that metadata callback eventually succeeds, if one was
// set.
if e.cfg.TestingKnobs.WaitForGossipUpdate {
Expand Down Expand Up @@ -936,13 +938,17 @@ func countRowsAffected(ctx context.Context, p planNode) (int, error) {

// runTxnAttempt is used in the closure we pass to txn.Exec(). It
// will be called possibly multiple times (if opt.AutoRetry is set).
//
// txnPrefix: set if the statements represent the first batch of statements in a
// txn. Used to trap nested BEGINs.
func runTxnAttempt(
e *Executor,
session *Session,
stmts StatementList,
pinfo *parser.PlaceholderInfo,
origState TxnStateEnum,
opt *client.TxnExecOptions,
txnPrefix bool,
avoidCachedDescriptors bool,
automaticRetryCount int,
) ([]Result, StatementList, error) {
Expand All @@ -959,7 +965,7 @@ func runTxnAttempt(

results, remainingStmts, err := e.execStmtsInCurrentTxn(
session, stmts, pinfo, opt.AutoCommit, /* implicitTxn */
opt.AutoRetry /* txnBeginning */, avoidCachedDescriptors, automaticRetryCount)
txnPrefix, avoidCachedDescriptors, automaticRetryCount)

if opt.AutoCommit && len(remainingStmts) > 0 {
panic("implicit txn failed to execute all stmts")
Expand Down Expand Up @@ -992,6 +998,8 @@ func runTxnAttempt(
// a transaction).
// COMMIT/ROLLBACK statements are rejected if set. Also, the transaction
// might be auto-committed in this function.
// txnPrefix: set if the statements represent the first batch of statements in a
// txn. Used to trap nested BEGINs.
// avoidCachedDescriptors: set if the statement execution should avoid
// using cached descriptors.
// automaticRetryCount: increases with each retry; 0 for the first attempt.
Expand All @@ -1010,7 +1018,7 @@ func (e *Executor) execStmtsInCurrentTxn(
stmts StatementList,
pinfo *parser.PlaceholderInfo,
implicitTxn bool,
txnBeginning bool,
txnPrefix bool,
avoidCachedDescriptors bool,
automaticRetryCount int,
) ([]Result, StatementList, error) {
Expand Down Expand Up @@ -1051,9 +1059,9 @@ func (e *Executor) execStmtsInCurrentTxn(
res, err = runShowTransactionState(session, implicitTxn)
} else {
switch txnState.State() {
case Open:
case Open, FirstBatch:
res, err = e.execStmtInOpenTxn(
session, stmt, pinfo, implicitTxn, txnBeginning && (i == 0), /* firstInTxn */
session, stmt, pinfo, implicitTxn, txnPrefix && (i == 0), /* firstInTxn */
avoidCachedDescriptors, automaticRetryCount)
case Aborted, RestartWait:
res, err = e.execStmtInAbortedTxn(session, stmt)
Expand Down Expand Up @@ -1096,6 +1104,11 @@ func getTransactionState(txnState *txnState, implicitTxn bool) string {
if implicitTxn {
state = NoTxn
}
// For the purposes of representing the states to client, make the FirstBatch
// state look like Open.
if state == FirstBatch {
state = Open
}
return state.String()
}

Expand Down Expand Up @@ -1157,8 +1170,8 @@ func (e *Executor) execStmtInAbortedTxn(session *Session, stmt Statement) (Resul
return Result{}, err
}
if txnState.State() == RestartWait {
// Reset the state. Txn is Open again.
txnState.SetState(Open)
// Reset the state to FirstBatch. We're in an "open" txn again.
txnState.SetState(FirstBatch)
// TODO(andrei/cdo): add a counter for user-directed retries.
return Result{}, nil
}
Expand Down Expand Up @@ -1222,7 +1235,8 @@ func sessionEventf(session *Session, format string, args ...interface{}) {
// It binds placeholders.
//
// The current transaction might be committed/rolled back when this returns.
// It might also have transitioned to the aborted or RestartWait state.
// It might also transition to the aborted or RestartWait state, and it
// might transition from FirstBatch to Open.
//
// Args:
// session: the session to execute the statement in.
Expand Down Expand Up @@ -1252,7 +1266,7 @@ func (e *Executor) execStmtInOpenTxn(
automaticRetryCount int,
) (_ Result, err error) {
txnState := &session.TxnState
if txnState.State() != Open {
if !txnState.TxnIsOpen() {
panic("execStmtInOpenTxn called outside of an open txn")
}

Expand All @@ -1263,12 +1277,18 @@ func (e *Executor) execStmtInOpenTxn(
e.updateStmtCounts(stmt)
}

// After the statement is executed, we might have to do state transitions.
defer func() {
if err != nil {
if txnState.State() != Open {
if !txnState.TxnIsOpen() {
panic(fmt.Sprintf("unexpected txnState when cleaning up: %v", txnState.State()))
}
txnState.updateStateAndCleanupOnErr(err, e)
} else if txnState.State() == FirstBatch &&
!canStayInFirstBatchState(stmt) {
// Transition from FirstBatch to Open except in the case of special
// statements that don't return results to the client.
txnState.SetState(Open)
}
}()

Expand Down Expand Up @@ -1439,7 +1459,7 @@ func stmtAllowedInImplicitTxn(stmt Statement) bool {

// rollbackSQLTransaction rolls back a transaction. All errors are swallowed.
func rollbackSQLTransaction(txnState *txnState) Result {
if txnState.State() != Open && txnState.State() != RestartWait {
if !txnState.TxnIsOpen() && txnState.State() != RestartWait {
panic(fmt.Sprintf("rollbackSQLTransaction called on txn in wrong state: %s (txn: %s)",
txnState.State(), txnState.mu.txn.Proto()))
}
Expand All @@ -1463,7 +1483,7 @@ const (

// commitSqlTransaction commits a transaction.
func commitSQLTransaction(txnState *txnState, commitType commitType) (Result, error) {
if txnState.State() != Open {
if !txnState.TxnIsOpen() {
panic(fmt.Sprintf("commitSqlTransaction called on non-open txn: %+v", txnState.mu.txn))
}
if commitType == commit {
Expand Down Expand Up @@ -1964,6 +1984,42 @@ func isAsOf(session *Session, stmt parser.Statement, max hlc.Timestamp) (*hlc.Ti
return &ts, err
}

// isSavepoint returns true if stmt is a SAVEPOINT statement.
func isSavepoint(stmt Statement) bool {
_, isSavepoint := stmt.AST.(*parser.Savepoint)
return isSavepoint
}

// isBegin returns true if stmt is a BEGIN statement.
func isBegin(stmt Statement) bool {
_, isBegin := stmt.AST.(*parser.BeginTransaction)
return isBegin
}

// isSetTransaction returns true if stmt is a "SET TRANSACTION ..." statement.
func isSetTransaction(stmt Statement) bool {
_, isSet := stmt.AST.(*parser.SetTransaction)
return isSet
}

// isRollbackToSavepoint returns true if stmt is a "ROLLBACK TO SAVEPOINT"
// statement.
func isRollbackToSavepoint(stmt Statement) bool {
_, isSet := stmt.AST.(*parser.RollbackToSavepoint)
return isSet
}

// canStayInFirstBatchState returns true if the statement can leave the
// transaction in the FirstBatch state (as opposed to transitioning it to Open).
func canStayInFirstBatchState(stmt Statement) bool {
return isBegin(stmt) ||
isSavepoint(stmt) ||
isSetTransaction(stmt) ||
// ROLLBACK TO SAVEPOINT does its own state transitions; if it leave the
// transaction in the FirstBatch state, don't mess with it.
isRollbackToSavepoint(stmt)
}

// convertToErrWithPGCode recognizes errs that should have SQL error codes to be
// reported to the client and converts err to them. If this doesn't apply, err
// is returned.
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/manual_retry
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ SELECT CRDB_INTERNAL.FORCE_RETRY('50ms':::INTERVAL)
statement ok
BEGIN TRANSACTION; SAVEPOINT cockroach_restart

# The SELECT 1 is necessary to take the session out of the FirstBatch state,
# otherwise the statement below would be retries automatically.
statement ok
SELECT 1

query error restart transaction: HandledRetryableTxnError: forced by crdb_internal.force_retry()
SELECT CRDB_INTERNAL.FORCE_RETRY('500ms':::INTERVAL)

Expand Down
81 changes: 79 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/txn
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ SELECT * FROM kv
a c
x y

# Two BEGINs in a row.

statement ok
BEGIN TRANSACTION

statement error there is already a transaction in progress
BEGIN TRANSACTION

statement ok
ROLLBACK TRANSACTION

# BEGIN in the middle of a transaction is an error.

statement ok
Expand Down Expand Up @@ -527,8 +538,10 @@ statement ok
COMMIT

# RestartWait state
# The SELECT 1 is necessary to move the txn out of the FirstBatch state,
# otherwise the next statement is automatically retried on the server.
statement ok
BEGIN TRANSACTION; SAVEPOINT cockroach_restart
BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SELECT 1

query error pgcode 40001 restart transaction: HandledRetryableTxnError: forced by crdb_internal.force_retry()
SELECT CRDB_INTERNAL.FORCE_RETRY('1s':::INTERVAL)
Expand All @@ -549,6 +562,68 @@ Open
statement ok
COMMIT

# Automatic retries for the first batch.
statement ok
BEGIN TRANSACTION; SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL)

statement ok
ROLLBACK

# Automatic retries for the first batch even when that first batch comes after
# the BEGIN.
statement ok
BEGIN TRANSACTION;

statement ok
SELECT 1; SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL)

statement ok
ROLLBACK

# Automatic retries for the first batch even when that first batch comes after
# the BEGIN and the BEGIN also has special statements that don't move the txn
# state out of the "FirstBatch" state.
statement ok
BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SET TRANSACTION PRIORITY NORMAL

statement ok
SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL)

statement ok
ROLLBACK

# Like above, but the SAVEPOINT is its own batch.
statement ok
BEGIN TRANSACTION

statement ok
SAVEPOINT cockroach_restart;

statement ok
SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL)

statement ok
ROLLBACK


# Automatic retries for the first batch after an explicit restart.
statement ok
BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SELECT 1;

query error pgcode 40001 restart transaction: HandledRetryableTxnError: forced by crdb_internal.force_retry()
SELECT CRDB_INTERNAL.FORCE_RETRY('50ms':::INTERVAL)

statement ok
ROLLBACK TO SAVEPOINT COCKROACH_RESTART;

# This is the automatic retry we care about.
statement ok
SELECT CRDB_INTERNAL.FORCE_RETRY('100ms':::INTERVAL)

statement ok
ROLLBACK


# General savepoints
statement ok
BEGIN TRANSACTION
Expand Down Expand Up @@ -651,8 +726,10 @@ BEGIN ISOLATION LEVEL SERIALIZABLE, ISOLATION LEVEL SERIALIZABLE

# Retryable error in a txn that hasn't performed any KV operations. It used to
# not work.
# The SELECT 1 is necessary to take the session out of the FirstBatch state,
# otherwise the statement below would be retries automatically.
statement ok
BEGIN
BEGIN; SELECT 1

query error pgcode 40001 restart transaction: HandledRetryableTxnError: forced by crdb_internal.force_retry()
SELECT CRDB_INTERNAL.FORCE_RETRY('1s':::INTERVAL)
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,15 @@ func TestAbortCountConflictingWrites(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// Run a batch of statements to move the txn out of the FirstBatch state,
// otherwise the INSERT below would be automatically retried.
if _, err := txn.Exec("SELECT 1"); err != nil {
t.Fatal(err)
}

_, err = txn.Exec("INSERT INTO db.t VALUES ('key', 'marker')")
if !testutils.IsError(err, "aborted") {
t.Fatal(err)
t.Fatalf("expected aborted error, got: %v", err)
}

if err = txn.Rollback(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/pgwire/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (c *v3Conn) serve(ctx context.Context, draining func() bool, reserved mon.B
// We send status "InFailedTransaction" also for state RestartWait
// because GO's lib/pq freaks out if we invent a new status.
txnStatus = 'E'
case sql.Open:
case sql.Open, sql.FirstBatch:
txnStatus = 'T'
case sql.NoTxn:
// We're not in a txn (i.e. the last txn was committed).
Expand Down
Loading

0 comments on commit 488a1ec

Please sign in to comment.