Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: attempt txn auto-commit before flushing txnResults #22721

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 54 additions & 51 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,23 +873,23 @@ func (e *Executor) execParsed(
//
// Args:
// stmtsToExec: A prefix of these will be executed. The remaining ones will be
// returned as remainingStmts.
// returned as remainingStmts.
// txnPrefix: Set if stmtsToExec corresponds to the start of the current
// transaction. Used to trap nested BEGINs.
// transaction. Used to trap nested BEGINs.
// autoCommit: If set, the transaction will be committed after running the
// statement. If set, stmtsToExec can only contain a single statement.
// If set, the transaction state will always be NoTxn when this function
// returns, regardless of errors.
// Errors encountered when committing are reported to the caller and are
// indistinguishable from errors encountered while running the query.
// protoTS: If not nil, the transaction proto sets its Orig and Max timestamps
// to it each retry.
// to it each retry.
//
// Returns:
// remainingStmts: all the statements that were not executed.
// transitionToOpen: specifies if the caller should move from state AutoRetry to
// state Open. This will be false if the state is not AutoRetry when this
// returns.
// state Open. This will be false if the state is not AutoRetry when this
// returns.
// err: An error that occurred while executing the queries.
func runWithAutoRetry(
e *Executor,
Expand Down Expand Up @@ -943,8 +943,8 @@ func runWithAutoRetry(

// Run some statements.
remainingStmts, transitionToOpen, err = runTxnAttempt(
e, session, stmtsToExec, pinfo, origState,
txnPrefix, asOfSystemTime, avoidCachedDescriptors, automaticRetryCount, txnState.txnResults)
e, session, stmtsToExec, pinfo, origState, txnPrefix, autoCommit,
asOfSystemTime, avoidCachedDescriptors, automaticRetryCount, txnState.txnResults)

// Sanity checks.
if err != nil && txnState.TxnIsOpen() {
Expand All @@ -967,48 +967,7 @@ func runWithAutoRetry(
}
}

// Check if we need to auto-commit. If so, we end the transaction now; the
// transaction was only supposed to exist for the statement that we just
// ran.
if autoCommit {
if err == nil {
txn := txnState.mu.txn
if txn == nil {
log.Fatalf(session.Ctx(), "implicit txn returned with no error and yet "+
"the kv txn is gone. No state transition should have done that. State: %s",
txnState.State())
}

// We were told to autoCommit. The KV txn might already be committed
// (planNodes are free to do that when running an implicit transaction,
// and some try to do it to take advantage of 1-PC txns). If it is, then
// there's nothing to do. If it isn't, then we commit it here.
//
// NOTE(andrei): It bothers me some that we're peeking at txn to figure
// out whether we committed or not, where SQL could already know that -
// individual statements could report this back.
if txn.IsAborted() {
log.Fatalf(session.Ctx(), "#7881: the statement we just ran didn't generate an error "+
"but the txn proto is aborted. This should never happen. txn: %+v",
txn)
}

if !txn.IsCommitted() {
var skipCommit bool
if e.cfg.TestingKnobs.BeforeAutoCommit != nil {
err = e.cfg.TestingKnobs.BeforeAutoCommit(session.Ctx(), stmtsToExec[0].String())
skipCommit = err != nil
}
if !skipCommit {
err = txn.Commit(session.Ctx())
}
log.Eventf(session.Ctx(), "AutoCommit. err: %v\ntxn: %+v", err, txn.Proto())
if err != nil {
err = txnState.updateStateAndCleanupOnErr(err, e)
}
}
}

// After autoCommit, unless we're in RestartWait, we leave the transaction
// in NoTxn, regardless of whether we executed the query successfully or
// we encountered an error.
Expand Down Expand Up @@ -1065,21 +1024,24 @@ func runWithAutoRetry(
//
// Args:
// txnPrefix: set if the start of the batch corresponds to the start of the
// current transaction. Used to trap nested BEGINs.
// current transaction. Used to trap nested BEGINs.
// autoCommit: If set, the transaction will be committed after running the
// statement.
// txnResults: used to push query results.
//
// It returns:
// remainingStmts: all the statements that were not executed.
// transitionToOpen: specifies if the caller should move from state AutoRetry to
// state Open. This will be false if the state is not AutoRetry when this
// returns.
// state Open. This will be false if the state is not AutoRetry when this
// returns.
func runTxnAttempt(
e *Executor,
session *Session,
stmts StatementList,
pinfo *tree.PlaceholderInfo,
origState TxnStateEnum,
txnPrefix bool,
autoCommit bool,
asOfSystemTime bool,
avoidCachedDescriptors bool,
automaticRetryCount int,
Expand Down Expand Up @@ -1131,6 +1093,47 @@ func runTxnAttempt(
}
}

// Check if we need to auto-commit. If so, we end the transaction now; the
// transaction was only supposed to exist for the statement that we just
// ran. This needs to happen before the txnResults Flush below.
if autoCommit {
txn := txnState.mu.txn
if txn == nil {
log.Fatalf(session.Ctx(), "implicit txn returned with no error and yet "+
"the kv txn is gone. No state transition should have done that. State: %s",
txnState.State())
}

// We were told to autoCommit. The KV txn might already be committed
// (planNodes are free to do that when running an implicit transaction,
// and some try to do it to take advantage of 1-PC txns). If it is, then
// there's nothing to do. If it isn't, then we commit it here.
//
// NOTE(andrei): It bothers me some that we're peeking at txn to figure
// out whether we committed or not, where SQL could already know that -
// individual statements could report this back.
if txn.IsAborted() {
log.Fatalf(session.Ctx(), "#7881: the statement we just ran didn't generate an error "+
"but the txn proto is aborted. This should never happen. txn: %+v",
txn)
}

if !txn.IsCommitted() {
if filter := e.cfg.TestingKnobs.BeforeAutoCommit; filter != nil {
if err := filter(session.Ctx(), stmts[0].String()); err != nil {
err = txnState.updateStateAndCleanupOnErr(err, e)
return nil, false, err
}
}
err := txn.Commit(session.Ctx())
log.Eventf(session.Ctx(), "AutoCommit. err: %v\ntxn: %+v", err, txn.Proto())
if err != nil {
err = txnState.updateStateAndCleanupOnErr(err, e)
return nil, false, err
}
}
}

// Flush the results accumulated in AutoRetry. We want possible future
// Reset()s to not discard them since we're not going to retry the
// statements. We also want future ResultsSentToClient() calls to return
Expand Down
46 changes: 32 additions & 14 deletions pkg/sql/txn_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,22 +499,22 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT, t DECIMAL);
}, false)

if err := aborter.QueueStmtForAbortion(
"INSERT INTO t.test(k, v, t) VALUES (1, 'boulanger', cluster_logical_timestamp())", 2 /* abortCount */, true, /* willBeRetriedIbid */
"INSERT INTO t.test(k, v, t) VALUES (1, 'boulanger', cluster_logical_timestamp()) RETURNING 1", 2 /* abortCount */, true, /* willBeRetriedIbid */
); err != nil {
t.Fatal(err)
}
if err := aborter.QueueStmtForAbortion(
"INSERT INTO t.test(k, v, t) VALUES (2, 'dromedary', cluster_logical_timestamp())", 2 /* abortCount */, true, /* willBeRetriedIbid */
"INSERT INTO t.test(k, v, t) VALUES (2, 'dromedary', cluster_logical_timestamp()) RETURNING 1", 2 /* abortCount */, true, /* willBeRetriedIbid */
); err != nil {
t.Fatal(err)
}
if err := aborter.QueueStmtForAbortion(
"INSERT INTO t.test(k, v, t) VALUES (3, 'fajita', cluster_logical_timestamp())", 2 /* abortCount */, true, /* willBeRetriedIbid */
"INSERT INTO t.test(k, v, t) VALUES (3, 'fajita', cluster_logical_timestamp()) RETURNING 1", 2 /* abortCount */, true, /* willBeRetriedIbid */
); err != nil {
t.Fatal(err)
}
if err := aborter.QueueStmtForAbortion(
"INSERT INTO t.test(k, v, t) VALUES (4, 'hooly', cluster_logical_timestamp())", 2 /* abortCount */, true, /* willBeRetriedIbid */
"INSERT INTO t.test(k, v, t) VALUES (4, 'hooly', cluster_logical_timestamp()) RETURNING 1", 2 /* abortCount */, true, /* willBeRetriedIbid */
); err != nil {
t.Fatal(err)
}
Expand All @@ -534,20 +534,38 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT, t DECIMAL);
// TODO(knz): This test can be made more robust by exposing the
// current allocation count in monitor and checking that it has the
// same value at the beginning of each retry.
if _, err := sqlDB.Exec(`
INSERT INTO t.test(k, v, t) VALUES (1, 'boulanger', cluster_logical_timestamp());
rows, err := sqlDB.Query(`
INSERT INTO t.test(k, v, t) VALUES (1, 'boulanger', cluster_logical_timestamp()) RETURNING 1;
BEGIN;
SELECT * FROM t.test;
INSERT INTO t.test(k, v, t) VALUES (2, 'dromedary', cluster_logical_timestamp());
INSERT INTO t.test(k, v, t) VALUES (3, 'fajita', cluster_logical_timestamp());
INSERT INTO t.test(k, v, t) VALUES (2, 'dromedary', cluster_logical_timestamp()) RETURNING 1;
INSERT INTO t.test(k, v, t) VALUES (3, 'fajita', cluster_logical_timestamp()) RETURNING 1;
END;
INSERT INTO t.test(k, v, t) VALUES (4, 'hooly', cluster_logical_timestamp());
INSERT INTO t.test(k, v, t) VALUES (4, 'hooly', cluster_logical_timestamp()) RETURNING 1;
BEGIN;
INSERT INTO t.test(k, v, t) VALUES (5, 'josephine', cluster_logical_timestamp());
INSERT INTO t.test(k, v, t) VALUES (6, 'laureal', cluster_logical_timestamp());
`); err != nil {
INSERT INTO t.test(k, v, t) VALUES (5, 'josephine', cluster_logical_timestamp()) RETURNING 1;
INSERT INTO t.test(k, v, t) VALUES (6, 'laureal', cluster_logical_timestamp()) RETURNING 1;
`)
if err != nil {
t.Fatal(err)
}
defer rows.Close()

resSets := 0
for {
for rows.Next() {
resSets++
}
if !rows.NextResultSet() {
break
}
}
if err := rows.Err(); err != nil {
t.Fatal(err)
}
if resSets != 6 {
t.Fatalf("Expected 6 result sets, got %d", resSets)
}

cleanupFilter()

checkRestarts(t, magicVals)
Expand Down Expand Up @@ -596,7 +614,7 @@ BEGIN;
}

// Continue the txn in a new request, which is not retriable.
_, err := sqlDB.Exec("INSERT INTO t.test(k, v, t) VALUES (4, 'hooly', cluster_logical_timestamp())")
_, err = sqlDB.Exec("INSERT INTO t.test(k, v, t) VALUES (4, 'hooly', cluster_logical_timestamp())")
if !testutils.IsError(
err, "RETRY_POSSIBLE_REPLAY") {
t.Errorf("didn't get expected injected error. Got: %v", err)
Expand Down