Skip to content

Commit b4ec5f2

Browse files
committed
sql: introduce connExecutor, the query execution orchestrator
Release note: None The connExecutor is the top dog that interfaces with a pgwire connection (through the clientComm and CommandResult interfaces), consumes a stream of queries and produces a stream of results. It encapsulates the connection state machine for which it produces events. It interfaces with the two SQL execution engines for actually running queries. Its main responsibility is to dispatch statements based on the current state (in txn, not in txn, in aborted txn, etc) and to handle execution in all states but Open (in Open it talks to an execution engine). It also handles other commands than executiong queries: preparing, binding, etc and it maintains the session state associated with prepares statements.
1 parent 505a5fb commit b4ec5f2

22 files changed

+2287
-209
lines changed

pkg/sql/advancecode_string.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/app_stats.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (a *appStats) getStrForStmt(stmt Statement) string {
164164
}
165165

166166
// sqlStats carries per-application statistics for all applications on
167-
// each node. It hangs off Executor.
167+
// each node.
168168
type sqlStats struct {
169169
st *cluster.Settings
170170
syncutil.Mutex

pkg/sql/conn_executor.go

+1,792-54
Large diffs are not rendered by default.

pkg/sql/conn_executor_utils.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright 2017 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License.
14+
15+
package sql
16+
17+
import (
18+
"context"
19+
20+
"github.com/cockroachdb/cockroach/pkg/util/fsm"
21+
)
22+
23+
// StatementFilter2 is the type of callback that
24+
// ExecutorTestingKnobs.StatementFilter takes.
25+
// If the callback returns an error, the session is terminated.
26+
//
27+
// WIP(andrei): rename
28+
type StatementFilter2 func(ctx context.Context, query string, ev fsm.Event) error

pkg/sql/conn_fsm.go

+126-28
Original file line numberDiff line numberDiff line change
@@ -22,32 +22,60 @@ package sql
2222

2323
import (
2424
"context"
25+
"time"
2526

2627
"github.com/cockroachdb/cockroach/pkg/roachpb"
28+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
29+
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
2730
// We dot-import fsm to use common names such as fsm.True/False. State machine
2831
// implementations using that library are weird beasts intimately inter-twined
2932
// with that package; therefor this file should stay as small as possible.
3033
. "github.com/cockroachdb/cockroach/pkg/util/fsm"
34+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3135
)
3236

3337
/// States.
3438

3539
type stateNoTxn struct{}
40+
41+
func (stateNoTxn) String() string {
42+
return "NoTxn"
43+
}
44+
3645
type stateOpen struct {
3746
ImplicitTxn Bool
3847
// RetryIntent, if set, means the user declared the intention to retry the txn
3948
// in case of retriable errors by running a SAVEPOINT cockroach_restart. The
4049
// txn will enter a RestartWait state in case of such errors.
4150
RetryIntent Bool
4251
}
52+
53+
func (stateOpen) String() string {
54+
return "Open"
55+
}
56+
4357
type stateAborted struct {
4458
// RetryIntent carries over the setting from stateOpen, in case we move back
4559
// to Open.
4660
RetryIntent Bool
4761
}
62+
63+
func (stateAborted) String() string {
64+
return "Aborted"
65+
}
66+
4867
type stateRestartWait struct{}
68+
69+
func (stateRestartWait) String() string {
70+
return "RestartWait"
71+
}
72+
4973
type stateCommitWait struct{}
5074

75+
func (stateCommitWait) String() string {
76+
return "CommitWait"
77+
}
78+
5179
func (stateNoTxn) State() {}
5280
func (stateOpen) State() {}
5381
func (stateAborted) State() {}
@@ -60,13 +88,51 @@ type eventTxnStart struct {
6088
ImplicitTxn Bool
6189
}
6290
type eventTxnStartPayload struct {
63-
tranCtx transitionCtx
91+
iso enginepb.IsolationType
92+
pri roachpb.UserPriority
93+
// txnSQLTimestamp is the timestamp that statements executed in the
94+
// transaction that is started by this event will report for now(),
95+
// current_timestamp(), transaction_timestamp().
96+
txnSQLTimestamp time.Time
97+
tranCtx transitionCtx
98+
readOnly tree.ReadWriteMode
99+
}
100+
101+
func makeEventTxnStartPayload(
102+
iso enginepb.IsolationType,
103+
pri roachpb.UserPriority,
104+
txnSQLTimestamp time.Time,
105+
tranCtx transitionCtx,
106+
readOnly tree.ReadWriteMode,
107+
) eventTxnStartPayload {
108+
return eventTxnStartPayload{
109+
iso: iso,
110+
pri: pri,
111+
readOnly: readOnly,
112+
txnSQLTimestamp: txnSQLTimestamp,
113+
tranCtx: tranCtx,
114+
}
64115
}
65116

66117
// eventRetryIntentSet is generated in the Open state when a SAVEPOINT
67118
// cockroach_restart is seen.
68119
type eventRetryIntentSet struct{}
69120
type eventTxnFinish struct{}
121+
122+
// eventTxnFinishPayload represents the payload for eventTxnFinish.
123+
type eventTxnFinishPayload struct {
124+
// commit is set if the transaction committed, false if it was aborted.
125+
commit bool
126+
}
127+
128+
// toEvent turns the eventTxnFinishPayload into a txnEvent.
129+
func (e eventTxnFinishPayload) toEvent() txnEvent {
130+
if e.commit {
131+
return txnCommit
132+
}
133+
return txnAborted
134+
}
135+
70136
type eventTxnRestart struct{}
71137

72138
type eventNonRetriableErr struct {
@@ -148,7 +214,7 @@ var TxnStateTransitions = Compile(Pattern{
148214
Action: func(args Args) error {
149215
return args.Extended.(*txnState2).noTxnToOpen(
150216
args.Ctx, args.Event.(eventTxnStart),
151-
args.Payload.(eventTxnStartPayload).tranCtx)
217+
args.Payload.(eventTxnStartPayload))
152218
},
153219
},
154220
},
@@ -161,7 +227,8 @@ var TxnStateTransitions = Compile(Pattern{
161227
Action: func(args Args) error {
162228
ts := args.Extended.(*txnState2)
163229
ts.finishSQLTxn(args.Ctx)
164-
ts.setAdvanceInfo(advanceInfo{code: advanceOne, flush: true})
230+
ts.setAdvanceInfo(
231+
advanceOne, flush, noRewind, args.Payload.(eventTxnFinishPayload).toEvent())
165232
return nil
166233
},
167234
},
@@ -190,10 +257,10 @@ var TxnStateTransitions = Compile(Pattern{
190257
Action: func(args Args) error {
191258
// The caller will call rewCap.rewindAndUnlock().
192259
args.Extended.(*txnState2).setAdvanceInfo(
193-
advanceInfo{
194-
code: rewind,
195-
rewCap: args.Payload.(eventRetriableErrPayload).rewCap,
196-
flush: false})
260+
rewind,
261+
noFlush,
262+
args.Payload.(eventRetriableErrPayload).rewCap,
263+
txnRestart)
197264
return nil
198265
},
199266
},
@@ -216,7 +283,7 @@ var TxnStateTransitions = Compile(Pattern{
216283
Action: func(args Args) error {
217284
ts := args.Extended.(*txnState2)
218285
ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause())
219-
ts.setAdvanceInfo(advanceInfo{code: skipQueryStr, flush: true})
286+
ts.setAdvanceInfo(skipBatch, flush, noRewind, txnAborted)
220287
return nil
221288
},
222289
},
@@ -230,7 +297,7 @@ var TxnStateTransitions = Compile(Pattern{
230297
Action: func(args Args) error {
231298
ts := args.Extended.(*txnState2)
232299
ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause())
233-
ts.setAdvanceInfo(advanceInfo{code: skipQueryStr, flush: true})
300+
ts.setAdvanceInfo(skipBatch, flush, noRewind, txnAborted)
234301
return nil
235302
},
236303
},
@@ -241,7 +308,7 @@ var TxnStateTransitions = Compile(Pattern{
241308
Action: func(args Args) error {
242309
// We flush after setting the retry intent; we know what statement
243310
// caused this event and we don't need to rewind past it.
244-
args.Extended.(*txnState2).setAdvanceInfo(advanceInfo{code: advanceOne, flush: true})
311+
args.Extended.(*txnState2).setAdvanceInfo(advanceOne, flush, noRewind, noEvent)
245312
return nil
246313
},
247314
},
@@ -252,15 +319,36 @@ var TxnStateTransitions = Compile(Pattern{
252319
Action: func(args Args) error {
253320
// Note: Preparing the KV txn for restart has already happened by this
254321
// point.
255-
args.Extended.(*txnState2).setAdvanceInfo(advanceInfo{code: skipQueryStr, flush: true})
322+
args.Extended.(*txnState2).setAdvanceInfo(skipBatch, flush, noRewind, txnRestart)
256323
return nil
257324
},
258325
},
259326
eventTxnReleased{}: {
260327
Description: "RELEASE SAVEPOINT cockroach_restart",
261328
Next: stateCommitWait{},
262329
Action: func(args Args) error {
263-
args.Extended.(*txnState2).setAdvanceInfo(advanceInfo{code: advanceOne, flush: true})
330+
args.Extended.(*txnState2).setAdvanceInfo(advanceOne, flush, noRewind, txnCommit)
331+
return nil
332+
},
333+
},
334+
// ROLLBACK TO SAVEPOINT
335+
eventTxnRestart{}: {
336+
Description: "ROLLBACK TO SAVEPOINT cockroach_restart",
337+
Next: stateOpen{ImplicitTxn: False, RetryIntent: True},
338+
Action: func(args Args) error {
339+
state := args.Extended.(*txnState2)
340+
// If commands have already been sent through the transaction, restart
341+
// the client txn's proto to increment the epoch.
342+
if state.mu.txn.GetTxnCoordMeta().CommandCount > 0 {
343+
// NOTE: We don't bump the txn timestamp on this restart. If this
344+
// we generally supported savepoints and one would issue a rollback to
345+
// a regular savepoint, clearly we couldn't bump the timestamp. But in
346+
// the special case of the cockroach_restart savepoint, it's not clear
347+
// to me what a user's expectation might be.
348+
state.mu.txn.Proto().Restart(
349+
0 /* userPriority */, 0 /* upgradePriority */, hlc.Timestamp{})
350+
}
351+
args.Extended.(*txnState2).setAdvanceInfo(advanceOne, flush, noRewind, noEvent)
264352
return nil
265353
},
266354
},
@@ -277,7 +365,8 @@ var TxnStateTransitions = Compile(Pattern{
277365
Action: func(args Args) error {
278366
ts := args.Extended.(*txnState2)
279367
ts.finishSQLTxn(ts.Ctx)
280-
ts.setAdvanceInfo(advanceInfo{code: advanceOne, flush: true})
368+
ts.setAdvanceInfo(
369+
advanceOne, flush, noRewind, args.Payload.(eventTxnFinishPayload).toEvent())
281370
return nil
282371
},
283372
},
@@ -293,13 +382,15 @@ var TxnStateTransitions = Compile(Pattern{
293382
ts := args.Extended.(*txnState2)
294383
ts.finishSQLTxn(args.Ctx)
295384

385+
payload := args.Payload.(eventTxnStartPayload)
386+
296387
ts.resetForNewSQLTxn(
297388
args.Ctx,
298389
explicitTxn,
299-
ts.sqlTimestamp, ts.isolation, ts.priority,
390+
payload.txnSQLTimestamp, payload.iso, payload.pri, payload.readOnly,
300391
args.Payload.(eventTxnStartPayload).tranCtx,
301392
)
302-
ts.setAdvanceInfo(advanceInfo{code: advanceOne, flush: true})
393+
ts.setAdvanceInfo(advanceOne, flush, noRewind, noEvent)
303394
return nil
304395
},
305396
},
@@ -313,7 +404,8 @@ var TxnStateTransitions = Compile(Pattern{
313404
Action: func(args Args) error {
314405
ts := args.Extended.(*txnState2)
315406
ts.finishSQLTxn(args.Ctx)
316-
ts.setAdvanceInfo(advanceInfo{code: advanceOne, flush: true})
407+
ts.setAdvanceInfo(
408+
advanceOne, flush, noRewind, args.Payload.(eventTxnFinishPayload).toEvent())
317409
return nil
318410
},
319411
},
@@ -322,7 +414,7 @@ var TxnStateTransitions = Compile(Pattern{
322414
Description: "ROLLBACK TO SAVEPOINT cockroach_restart",
323415
Next: stateOpen{ImplicitTxn: False, RetryIntent: True},
324416
Action: func(args Args) error {
325-
args.Extended.(*txnState2).setAdvanceInfo(advanceInfo{code: advanceOne, flush: true})
417+
args.Extended.(*txnState2).setAdvanceInfo(advanceOne, flush, noRewind, noEvent)
326418
return nil
327419
},
328420
},
@@ -332,7 +424,7 @@ var TxnStateTransitions = Compile(Pattern{
332424
ts := args.Extended.(*txnState2)
333425
ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(eventNonRetriableErrPayload).err)
334426
ts.mu.txn = nil
335-
ts.setAdvanceInfo(advanceInfo{code: skipQueryStr, flush: true})
427+
ts.setAdvanceInfo(skipBatch, flush, noRewind, noEvent)
336428
return nil
337429
},
338430
},
@@ -345,7 +437,8 @@ var TxnStateTransitions = Compile(Pattern{
345437
Action: func(args Args) error {
346438
ts := args.Extended.(*txnState2)
347439
ts.finishSQLTxn(args.Ctx)
348-
ts.setAdvanceInfo(advanceInfo{code: advanceOne, flush: true})
440+
ts.setAdvanceInfo(
441+
advanceOne, flush, noRewind, args.Payload.(eventTxnFinishPayload).toEvent())
349442
return nil
350443
},
351444
},
@@ -355,7 +448,7 @@ var TxnStateTransitions = Compile(Pattern{
355448
Description: "any other statement",
356449
Next: stateAborted{RetryIntent: True},
357450
Action: func(args Args) error {
358-
args.Extended.(*txnState2).setAdvanceInfo(advanceInfo{code: skipQueryStr, flush: true})
451+
args.Extended.(*txnState2).setAdvanceInfo(skipBatch, flush, noRewind, noEvent)
359452
return nil
360453
},
361454
},
@@ -367,27 +460,32 @@ func cleanupAndFinish(args Args) error {
367460
ts := args.Extended.(*txnState2)
368461
ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause())
369462
ts.finishSQLTxn(args.Ctx)
370-
ts.setAdvanceInfo(advanceInfo{code: skipQueryStr, flush: true})
463+
ts.setAdvanceInfo(skipBatch, flush, noRewind, txnAborted)
371464
return nil
372465
}
373466

374467
// noTxnToOpen implements the side effects of starting a txn. It also calls
375-
// setAdvanceInfo(), telling the execution to stayInPlace.
468+
// setAdvanceInfo().
376469
func (ts *txnState2) noTxnToOpen(
377-
connCtx context.Context, ev eventTxnStart, tranCtx transitionCtx,
470+
connCtx context.Context, ev eventTxnStart, payload eventTxnStartPayload,
378471
) error {
379472
txnTyp := explicitTxn
473+
advCode := advanceOne
380474
if ev.ImplicitTxn.Get() {
381475
txnTyp = implicitTxn
476+
// For an implicit txn, we want the statement that produced the event to be
477+
// executed again (this time in state Open).
478+
advCode = stayInPlace
382479
}
383480

384481
ts.resetForNewSQLTxn(
385482
connCtx,
386483
txnTyp,
387-
tranCtx.clock.PhysicalTime(), /* sqlTimestamp */
388-
tranCtx.defaultIsolationLevel,
389-
roachpb.NormalUserPriority,
390-
tranCtx,
484+
payload.txnSQLTimestamp,
485+
payload.iso,
486+
payload.pri,
487+
payload.readOnly,
488+
payload.tranCtx,
391489
)
392490
// When starting a transaction from the NoTxn state, we don't advance the
393491
// cursor - we want the same statement to be executed again. Note that this is
@@ -399,6 +497,6 @@ func (ts *txnState2) noTxnToOpen(
399497
// flushed (since we're not in a txn), and we shouldn't have generated any
400498
// results for the statement that generated this event (which is why we can
401499
// stayInPlace here).
402-
ts.setAdvanceInfo(advanceInfo{code: stayInPlace, flush: true})
500+
ts.setAdvanceInfo(advCode, flush, noRewind, noEvent)
403501
return nil
404502
}

0 commit comments

Comments
 (0)