Skip to content

Commit

Permalink
sql,plpgsql: add setting to give cursors default WITH HOLD behavior
Browse files Browse the repository at this point in the history
This patch introduces a new setting, `close_cursors_at_commit`, which causes
cursors opened using a PL/pgSQL OPEN statement to remain open once the calling
transaction commits. This is similar to oracle behavior, and will be useful
for enabling migrations.

As part of this change, the `sqlCursors.closeAll` method has been expanded to
take in the reason for closing as a parameter. It now uses the following rules:
* If the reason for closing is txn commit, non-HOLD cursors are closed.
* If the reason for closing is txn rollback, all cursors created by the current
  transaction are closed.
* If the reason for closing is an explicit CLOSE ALL or the session closing,
  all cursors are closed unconditionally.

Note that the behavior has not changed for SQL cursors - if a SQL cursor is
declared using `WITH HOLD` and is open at txn commit, an error will result.

Informs #77101

Release note (sql change): Introduced a new setting, `close_cursors_at_commit`,
which causes a cursor to remain open even after its calling transaction
commits. Note that transaction rollback still closes any cursor created in
that transaction.
  • Loading branch information
DrewKimball committed Jan 18, 2024
1 parent 624885c commit c2c97e1
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 34 deletions.
130 changes: 130 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/plpgsql_cursor
Original file line number Diff line number Diff line change
Expand Up @@ -1923,3 +1923,133 @@ CREATE OR REPLACE FUNCTION f(curs STRING) RETURNS INT AS $$
RETURN 0;
END
$$ LANGUAGE PLpgSQL;

subtest hold_setting

# Test the open_plpgsql_cursor_with_hold setting, which causes PL/pgSQL cursors
# to remain open after their transaction closes.
statement ok
CREATE PROCEDURE make_curs(curs REFCURSOR) AS $$
BEGIN
OPEN curs FOR SELECT 1, 2;
END
$$ LANGUAGE PLpgSQL;

# The cursors should be closed when the implicit transaction closes.
statement ok
CALL make_curs('foo');
CALL make_curs('bar');

query T
SELECT name FROM pg_cursors;
----

# The cursors should be visible within the explicit transcation.
statement ok
BEGIN;
CALL make_curs('foo');
CALL make_curs('bar');

query T rowsort
SELECT name FROM pg_cursors;
----
foo
bar

# The cursors should be closed when the explicit transaction commits.
statement ok
COMMIT;

query T
SELECT name FROM pg_cursors;
----

# The cursors should still be visible even through they were opened within
# implicit transactions.
statement ok
SET close_cursors_at_commit = false;
CALL make_curs('foo');
CALL make_curs('bar');

query T rowsort
SELECT name FROM pg_cursors;
----
foo
bar

query II
FETCH FROM foo;
----
1 2

# A CLOSE statement should still close the open cursors.
statement ok
CLOSE ALL;

query T
SELECT name FROM pg_cursors;
----

# Cursors should remain open after the explicit transaction commits.
statement ok
BEGIN;
CALL make_curs('foo');
CALL make_curs('bar');
COMMIT;

query T rowsort
SELECT name FROM pg_cursors;
----
foo
bar

query II
FETCH FROM bar;
----
1 2

statement ok
CLOSE ALL;

# Cursors should be rolled back if opened in a transaction that is rolled back.
statement ok
BEGIN;
CALL make_curs('foo');
CALL make_curs('bar');
ROLLBACK;

query T
SELECT name FROM pg_cursors;
----

# A preexisting cursor should not be rolled back when a transaction aborts.
statement ok
CALL make_curs('foo');

statement ok
BEGIN;
CALL make_curs('bar');
ROLLBACK;

query T
SELECT name FROM pg_cursors;
----
foo

query II
FETCH FROM foo;
----
1 2

# The preexisting cursor should still be closed by CLOSE ALL.
statement ok
CLOSE ALL;

query T
SELECT name FROM pg_cursors;
----

statement ok
RESET close_cursors_at_commit;

subtest end
28 changes: 28 additions & 0 deletions pkg/sql/buffer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ func (c *rowContainerHelper) InitWithDedup(
c.scratch = make(rowenc.EncDatumRow, len(typs))
}

// InitWithParentMon is a variant of Init that allows the parent memory monitor
// to be specified. This is useful when the container should not be owned by the
// current transaction (e.g. a SQL cursor that lives on the session).
func (c *rowContainerHelper) InitWithParentMon(
ctx context.Context,
typs []*types.T,
parent *mon.BytesMonitor,
evalContext *extendedEvalContext,
opName redact.RedactableString,
) {
distSQLCfg := &evalContext.DistSQLPlanner.distSQLSrv.ServerConfig
// TODO(yuzefovich): currently the memory usage of c.memMonitor doesn't
// count against sql.mem.distsql.current metric. Fix it.
c.memMonitor = execinfra.NewLimitedMonitorNoFlowCtx(
ctx, parent, distSQLCfg, evalContext.SessionData(),
redact.Sprintf("%s-limited", opName),
)
c.diskMonitor = execinfra.NewMonitor(
ctx, distSQLCfg.ParentDiskMonitor, redact.Sprintf("%s-disk", opName),
)
c.rows = &rowcontainer.DiskBackedRowContainer{}
c.rows.Init(
colinfo.NoOrdering, typs, &evalContext.Context,
distSQLCfg.TempStorage, c.memMonitor, c.diskMonitor,
)
c.scratch = make(rowenc.EncDatumRow, len(typs))
}

func (c *rowContainerHelper) initMonitors(
ctx context.Context, evalContext *extendedEvalContext, opName redact.RedactableString,
) {
Expand Down
24 changes: 16 additions & 8 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
if err := ex.extraTxnState.sqlCursors.closeAll(cursorCloseForExplicitClose); err != nil {
log.Warningf(ctx, "error closing cursors: %v", err)
}

Expand Down Expand Up @@ -1472,10 +1472,12 @@ type connExecutor struct {
// connExecutor's closure.
prepStmtsNamespaceMemAcc mon.BoundAccount

// sqlCursors contains the list of SQL CURSORs the session currently has
// sqlCursors contains the list of SQL cursors the session currently has
// access to.
// Cursors are bound to an explicit transaction and they're all destroyed
// once the transaction finishes.
//
// Cursors declared WITH HOLD belong to the session, and can outlive their
// parent transaction. Otherwise, cursors are bound to their transaction,
// and are destroyed when the transaction finishes.
sqlCursors cursorMap

// shouldExecuteOnTxnFinish indicates that ex.onTxnFinish will be called
Expand Down Expand Up @@ -1997,8 +1999,14 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)

// Close all cursors.
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
// Close all (non HOLD) cursors.
var closeReason cursorCloseReason
if ev.eventType == txnCommit {
closeReason = cursorCloseForTxnCommit
} else {
closeReason = cursorCloseForTxnRollback
}
if err := ex.extraTxnState.sqlCursors.closeAll(closeReason); err != nil {
log.Warningf(ctx, "error closing cursors: %v", err)
}

Expand Down Expand Up @@ -2466,7 +2474,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
// txnState.finishSQLTxn() is being called, as the underlying resources of
// pausable portals hasn't been cleared yet.
ex.extraTxnState.prepStmtsNamespace.closeAllPausablePortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc)
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
if err := ex.extraTxnState.sqlCursors.closeAll(cursorCloseForTxnRollback); err != nil {
log.Warningf(ctx, "error closing cursors: %v", err)
}
}
Expand Down Expand Up @@ -3709,7 +3717,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
func (ex *connExecutor) resetPlanner(
ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time,
) {
p.resetPlanner(ctx, txn, stmtTS, ex.sessionData(), ex.state.mon)
p.resetPlanner(ctx, txn, stmtTS, ex.sessionData(), ex.state.mon, ex.sessionMon)
autoRetryReason := ex.state.mu.autoRetryReason
// If we are retrying due to an unsatisfiable timestamp bound which is
// retriable, it means we were unable to serve the previous minimum timestamp
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,7 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) (retEr
ex.recordDDLTxnTelemetry(failed)
}()

if err := ex.extraTxnState.sqlCursors.closeAll(true /* errorOnWithHold */); err != nil {
if err := ex.extraTxnState.sqlCursors.closeAll(cursorCloseForTxnCommit); err != nil {
return err
}

Expand Down Expand Up @@ -1581,7 +1581,7 @@ func (ex *connExecutor) createJobs(ctx context.Context) error {
func (ex *connExecutor) rollbackSQLTransaction(
ctx context.Context, stmt tree.Statement,
) (fsm.Event, fsm.EventPayload) {
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
if err := ex.extraTxnState.sqlCursors.closeAll(cursorCloseForTxnRollback); err != nil {
return ex.makeErrEvent(err, stmt)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3718,6 +3718,10 @@ func (m *sessionDataMutator) SetDistSQLPlanGatewayBias(val int64) {
m.data.DistsqlPlanGatewayBias = val
}

func (m *sessionDataMutator) SetCloseCursorsAtCommit(val bool) {
m.data.CloseCursorsAtCommit = val
}

// Utility functions related to scrubbing sensitive information on SQL Stats.

// quantizeCounts ensures that the Count field in the
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4193,12 +4193,12 @@ https://www.postgresql.org/docs/14/view-pg-cursors.html`,
return err
}
if err := addRow(
tree.NewDString(string(name)), /* name */
tree.NewDString(c.statement), /* statement */
tree.DBoolFalse, /* is_holdable */
tree.DBoolFalse, /* is_binary */
tree.DBoolFalse, /* is_scrollable */
tz, /* creation_date */
tree.NewDString(string(name)), /* name */
tree.NewDString(c.statement), /* statement */
tree.MakeDBool(tree.DBool(c.withHold)), /* is_holdable */
tree.DBoolFalse, /* is_binary */
tree.DBoolFalse, /* is_scrollable */
tz, /* creation_date */
); err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,17 @@ type planner struct {
innerPlansMustUseLeafTxn int32
}

// monitor tracks the memory usage of txn-bound objects - for example,
// execution operators.
monitor *mon.BytesMonitor

// sessionMonitor tracks the memory of session-bound objects. It currently
// only used internally for tracking SQL cursors declared using WITH HOLD.
//
// NOTE: sessionMonitor is unset for queries that are not associated with a
// session (e.g. internal queries).
sessionMonitor *mon.BytesMonitor

// Corresponding Statement for this query.
stmt Statement

Expand Down Expand Up @@ -875,11 +884,13 @@ func (p *planner) resetPlanner(
stmtTS time.Time,
sd *sessiondata.SessionData,
plannerMon *mon.BytesMonitor,
sessionMon *mon.BytesMonitor,
) {
p.txn = txn
p.stmt = Statement{}
p.instrumentation = instrumentationHelper{}
p.monitor = plannerMon
p.sessionMonitor = sessionMon

p.cancelChecker.Reset(ctx)

Expand Down
19 changes: 16 additions & 3 deletions pkg/sql/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ func (g *routineGenerator) startInternal(ctx context.Context, txn *kv.Txn) (err
w = rrw
} else if openCursor {
// The result of the first statement will be used to open a SQL cursor.
cursorHelper, err = g.newCursorHelper(plan.(*planComponents))
cursorSQL := g.expr.CursorDeclaration.CursorSQL
cursorHelper, err = g.newCursorHelper(plan.(*planComponents), cursorSQL)
if err != nil {
return err
}
Expand Down Expand Up @@ -450,7 +451,9 @@ func (d *droppingResultWriter) Err() error {
return d.err
}

func (g *routineGenerator) newCursorHelper(plan *planComponents) (*plpgsqlCursorHelper, error) {
func (g *routineGenerator) newCursorHelper(
plan *planComponents, sql string,
) (*plpgsqlCursorHelper, error) {
open := g.expr.CursorDeclaration
if open.NameArgIdx < 0 || open.NameArgIdx >= len(g.args) {
panic(errors.AssertionFailedf("unexpected name argument index: %d", open.NameArgIdx))
Expand All @@ -471,11 +474,20 @@ func (g *routineGenerator) newCursorHelper(plan *planComponents) (*plpgsqlCursor
ctx: context.Background(),
cursorName: cursorName,
resultCols: make(colinfo.ResultColumns, len(planCols)),
cursorSql: sql,
}
copy(cursorHelper.resultCols, planCols)
cursorHelper.container.Init(
mon := g.p.Mon()
if !g.p.SessionData().CloseCursorsAtCommit {
mon = g.p.sessionMonitor
if mon == nil {
return nil, errors.AssertionFailedf("cannot open cursor WITH HOLD without an active session")
}
}
cursorHelper.container.InitWithParentMon(
cursorHelper.ctx,
getTypesFromResultColumns(planCols),
mon,
g.p.ExtendedEvalContextCopy(),
"routine_open_cursor", /* opName */
)
Expand Down Expand Up @@ -509,6 +521,7 @@ func (h *plpgsqlCursorHelper) createCursor(p *planner, blockState *tree.BlockSta
txn: p.txn,
statement: h.cursorSql,
created: timeutil.Now(),
withHold: !p.SessionData().CloseCursorsAtCommit,
eagerExecution: true,
}
if err := p.checkIfCursorExists(h.cursorName); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ message LocalOnlySessionData {
// CopyNumRetriesPerBatch determines the number of times a single batch of
// rows can be retried for non-atomic COPY.
int32 copy_num_retries_per_batch = 120;
// CloseCursorsAtCommit determines whether cursors remain open after their
// parent transaction closes.
bool close_cursors_at_commit = 121;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
Loading

0 comments on commit c2c97e1

Please sign in to comment.