-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathexecutor.go
1807 lines (1646 loc) · 61.5 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tamir Duberstein ([email protected])
// Author: Andrei Matei ([email protected])
package sql
import (
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"golang.org/x/net/context"
"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
var errNoTransactionInProgress = errors.New("there is no transaction in progress")
var errNoTransactionToPipeline = errors.New("statement pipelining is only allowed in a transaction")
var errStaleMetadata = errors.New("metadata is still stale")
var errTransactionInProgress = errors.New("there is already a transaction in progress")
var errStmtFollowsSchemaChange = errors.New("statement cannot follow a schema change in a transaction")
const sqlTxnName string = "sql txn"
const sqlImplicitTxnName string = "sql txn implicit"
const metricsSampleInterval = 10 * time.Second
// Fully-qualified names for metrics.
var (
MetaTxnBegin = metric.Metadata{Name: "sql.txn.begin.count"}
MetaTxnCommit = metric.Metadata{Name: "sql.txn.commit.count"}
MetaTxnAbort = metric.Metadata{Name: "sql.txn.abort.count"}
MetaTxnRollback = metric.Metadata{Name: "sql.txn.rollback.count"}
MetaSelect = metric.Metadata{Name: "sql.select.count"}
MetaSQLExecLatency = metric.Metadata{Name: "sql.exec.latency"}
MetaDistSQLSelect = metric.Metadata{Name: "sql.distsql.select.count"}
MetaDistSQLExecLatency = metric.Metadata{Name: "sql.distsql.exec.latency"}
MetaUpdate = metric.Metadata{Name: "sql.update.count"}
MetaInsert = metric.Metadata{Name: "sql.insert.count"}
MetaDelete = metric.Metadata{Name: "sql.delete.count"}
MetaDdl = metric.Metadata{Name: "sql.ddl.count"}
MetaMisc = metric.Metadata{Name: "sql.misc.count"}
MetaQuery = metric.Metadata{Name: "sql.query.count"}
)
// distSQLExecMode controls if and when the Executor uses DistSQL.
type distSQLExecMode int
const (
// distSQLOff means that we never use distSQL.
distSQLOff distSQLExecMode = iota
// distSQLAuto means that we automatically decide on a case-by-case basis if
// we use distSQL.
distSQLAuto
// distSQLOn means that we use distSQL for queries that are supported.
distSQLOn
// distSQLAlways means that we only use distSQL; unsupported queries fail.
distSQLAlways
)
func distSQLExecModeFromString(val string) distSQLExecMode {
switch strings.ToUpper(val) {
case "OFF":
return distSQLOff
case "AUTO":
return distSQLAuto
case "ON":
return distSQLOn
case "ALWAYS":
return distSQLAlways
default:
panic(fmt.Sprintf("unknown DistSQL mode %s", val))
}
}
// defaultDistSQLMode controls the default DistSQL mode (see above). It can
// still be overridden per-session using `SET DIST_SQL = ...`.
var defaultDistSQLMode = distSQLExecModeFromString(
envutil.EnvOrDefaultString("COCKROACH_DISTSQL_MODE", "OFF"),
)
// SetDefaultDistSQLMode changes the default DistSQL mode; returns a function
// that can be used to restore the previous mode.
func SetDefaultDistSQLMode(mode string) func() {
prevMode := defaultDistSQLMode
defaultDistSQLMode = distSQLExecModeFromString(mode)
return func() {
defaultDistSQLMode = prevMode
}
}
type traceResult struct {
tag string
count int
}
func (r *traceResult) String() string {
if r.count < 0 {
return r.tag
}
return fmt.Sprintf("%s %d", r.tag, r.count)
}
// ResultList represents a list of results for a list of SQL statements.
// There is one result object per SQL statement in the request.
type ResultList []Result
// StatementResults represents a list of results from running a batch of
// SQL statements, plus some meta info about the batch.
type StatementResults struct {
ResultList
// Indicates that after parsing, the request contained 0 non-empty statements.
Empty bool
}
// Close ensures that the resources claimed by the results are released.
func (s *StatementResults) Close(ctx context.Context) {
s.ResultList.Close(ctx)
}
// Close ensures that the resources claimed by the results are released.
func (rl ResultList) Close(ctx context.Context) {
for _, r := range rl {
r.Close(ctx)
}
}
// Result corresponds to the execution of a single SQL statement.
type Result struct {
Err error
// The type of statement that the result is for.
Type parser.StatementType
// The tag of the statement that the result is for.
PGTag string
// RowsAffected will be populated if the statement type is "RowsAffected".
RowsAffected int
// Columns will be populated if the statement type is "Rows". It will contain
// the names and types of the columns returned in the result set in the order
// specified in the SQL statement. The number of columns will equal the number
// of values in each Row.
Columns ResultColumns
// Rows will be populated if the statement type is "Rows". It will contain
// the result set of the result.
// TODO(nvanbenschoten): Can this be streamed from the planNode?
Rows *RowContainer
}
// Close ensures that the resources claimed by the result are released.
func (r *Result) Close(ctx context.Context) {
// The Rows pointer may be nil if the statement returned no rows or
// if an error occurred.
if r.Rows != nil {
r.Rows.Close(ctx)
}
}
// ResultColumn contains the name and type of a SQL "cell".
type ResultColumn struct {
Name string
Typ parser.Type
// If set, this is an implicit column; used internally.
hidden bool
// If set, a value won't be produced for this column; used internally.
omitted bool
}
// ResultColumns is the type used throughout the sql module to
// describe the column types of a table.
type ResultColumns []ResultColumn
// An Executor executes SQL statements.
// Executor is thread-safe.
type Executor struct {
cfg ExecutorConfig
stopper *stop.Stopper
reCache *parser.RegexpCache
virtualSchemas virtualSchemaHolder
// Transient stats.
SelectCount *metric.Counter
// The subset of SELECTs that are processed through DistSQL.
DistSQLSelectCount *metric.Counter
DistSQLExecLatency *metric.Histogram
SQLExecLatency *metric.Histogram
TxnBeginCount *metric.Counter
// txnCommitCount counts the number of times a COMMIT was attempted.
TxnCommitCount *metric.Counter
TxnAbortCount *metric.Counter
TxnRollbackCount *metric.Counter
UpdateCount *metric.Counter
InsertCount *metric.Counter
DeleteCount *metric.Counter
DdlCount *metric.Counter
MiscCount *metric.Counter
QueryCount *metric.Counter
// System Config and mutex.
systemConfig config.SystemConfig
databaseCache *databaseCache
systemConfigMu syncutil.RWMutex
// This uses systemConfigMu in RLocker mode to not block
// execution of statements. So don't go on changing state after you've
// Wait()ed on it.
systemConfigCond *sync.Cond
distSQLPlanner *distSQLPlanner
// Application-level SQL statistics
sqlStats sqlStats
}
// An ExecutorConfig encompasses the auxiliary objects and configuration
// required to create an executor.
// All fields holding a pointer or an interface are required to create
// a Executor; the rest will have sane defaults set if omitted.
type ExecutorConfig struct {
AmbientCtx log.AmbientContext
NodeID *base.NodeIDContainer
DB *client.DB
Gossip *gossip.Gossip
DistSender *kv.DistSender
RPCContext *rpc.Context
LeaseManager *LeaseManager
Clock *hlc.Clock
DistSQLSrv *distsqlrun.ServerImpl
TestingKnobs *ExecutorTestingKnobs
SchemaChangerTestingKnobs *SchemaChangerTestingKnobs
// HistogramWindowInterval is (server.Context).HistogramWindowInterval.
HistogramWindowInterval time.Duration
}
var _ base.ModuleTestingKnobs = &ExecutorTestingKnobs{}
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
func (*ExecutorTestingKnobs) ModuleTestingKnobs() {}
// StatementFilter is the type of callback that
// ExecutorTestingKnobs.StatementFilter takes.
type StatementFilter func(context.Context, string, *Result)
// ExecutorTestingKnobs is part of the context used to control parts of the
// system during testing.
type ExecutorTestingKnobs struct {
// WaitForGossipUpdate causes metadata-mutating operations to wait
// for the new metadata to back-propagate through gossip.
WaitForGossipUpdate bool
// CheckStmtStringChange causes Executor.execStmtsInCurrentTxn to verify
// that executed statements are not modified during execution.
CheckStmtStringChange bool
// StatementFilter can be used to trap execution of SQL statements and
// optionally change their results. The filter function is invoked after each
// statement has been executed.
StatementFilter StatementFilter
// DisableAutoCommit, if set, disables the auto-commit functionality of some
// SQL statements. That functionality allows some statements to commit
// directly when they're executed in an implicit SQL txn, without waiting for
// the Executor to commit the implicit txn.
// This has to be set in tests that need to abort such statements using a
// StatementFilter; otherwise, the statement commits immediately after
// execution so there'll be nothing left to abort by the time the filter runs.
DisableAutoCommit bool
}
// NewExecutor creates an Executor and registers a callback on the
// system config.
func NewExecutor(cfg ExecutorConfig, stopper *stop.Stopper) *Executor {
return &Executor{
cfg: cfg,
stopper: stopper,
reCache: parser.NewRegexpCache(512),
TxnBeginCount: metric.NewCounter(MetaTxnBegin),
TxnCommitCount: metric.NewCounter(MetaTxnCommit),
TxnAbortCount: metric.NewCounter(MetaTxnAbort),
TxnRollbackCount: metric.NewCounter(MetaTxnRollback),
SelectCount: metric.NewCounter(MetaSelect),
DistSQLSelectCount: metric.NewCounter(MetaDistSQLSelect),
// TODO(mrtracy): See HistogramWindowInterval in server/config.go for the 6x factor.
DistSQLExecLatency: metric.NewLatency(MetaDistSQLExecLatency,
6*metricsSampleInterval),
SQLExecLatency: metric.NewLatency(MetaSQLExecLatency,
6*metricsSampleInterval),
UpdateCount: metric.NewCounter(MetaUpdate),
InsertCount: metric.NewCounter(MetaInsert),
DeleteCount: metric.NewCounter(MetaDelete),
DdlCount: metric.NewCounter(MetaDdl),
MiscCount: metric.NewCounter(MetaMisc),
QueryCount: metric.NewCounter(MetaQuery),
sqlStats: sqlStats{apps: make(map[string]*appStats)},
}
}
// Start starts workers for the executor and initializes the distSQLPlanner.
func (e *Executor) Start(
ctx context.Context, startupMemMetrics *MemoryMetrics, nodeDesc roachpb.NodeDescriptor,
) {
ctx = e.AnnotateCtx(ctx)
log.Infof(ctx, "creating distSQLPlanner with address %s", nodeDesc.Address)
e.distSQLPlanner = newDistSQLPlanner(
nodeDesc, e.cfg.RPCContext, e.cfg.DistSQLSrv, e.cfg.DistSender, e.cfg.Gossip,
)
e.databaseCache = newDatabaseCache(e.systemConfig)
e.systemConfigCond = sync.NewCond(e.systemConfigMu.RLocker())
gossipUpdateC := e.cfg.Gossip.RegisterSystemConfigChannel()
e.stopper.RunWorker(func() {
for {
select {
case <-gossipUpdateC:
sysCfg, _ := e.cfg.Gossip.GetSystemConfig()
e.updateSystemConfig(sysCfg)
case <-e.stopper.ShouldStop():
return
}
}
})
// Until per-statement statistics are properly recorded and
// scrubbed, we clear them periodically.
// TODO(dt): remove this.
e.sqlStats.startResetWorker(e.stopper)
ctx = log.WithLogTag(ctx, "startup", nil)
startupSession := NewSession(ctx, SessionArgs{}, e, nil, startupMemMetrics)
startupSession.StartUnlimitedMonitor()
if err := e.virtualSchemas.init(ctx, startupSession.newPlanner(e, nil)); err != nil {
log.Fatal(ctx, err)
}
startupSession.Finish(e)
}
// SetDistSQLSpanResolver changes the SpanResolver used for DistSQL. It is the
// caller's responsibility to make sure no queries are being run with DistSQL at
// the same time.
func (e *Executor) SetDistSQLSpanResolver(spanResolver distsqlplan.SpanResolver) {
e.distSQLPlanner.setSpanResolver(spanResolver)
}
// AnnotateCtx is a convenience wrapper; see AmbientContext.
func (e *Executor) AnnotateCtx(ctx context.Context) context.Context {
return e.cfg.AmbientCtx.AnnotateCtx(ctx)
}
// updateSystemConfig is called whenever the system config gossip entry is updated.
func (e *Executor) updateSystemConfig(cfg config.SystemConfig) {
e.systemConfigMu.Lock()
defer e.systemConfigMu.Unlock()
e.systemConfig = cfg
// The database cache gets reset whenever the system config changes.
e.databaseCache = newDatabaseCache(cfg)
e.systemConfigCond.Broadcast()
}
// getDatabaseCache returns a database cache with a copy of the latest
// system config.
func (e *Executor) getDatabaseCache() *databaseCache {
e.systemConfigMu.RLock()
defer e.systemConfigMu.RUnlock()
cache := e.databaseCache
return cache
}
// Prepare returns the result types of the given statement. pinfo may
// contain partial type information for placeholders. Prepare will
// populate the missing types. The PreparedStatement is returned (or
// nil if there are no results).
func (e *Executor) Prepare(
query string, session *Session, pinfo parser.PlaceholderTypes,
) (*PreparedStatement, error) {
session.resetForBatch(e)
log.VEventf(session.Ctx(), 2, "preparing: %s", query)
var parser parser.Parser
stmts, err := parser.Parse(query, session.Syntax)
if err != nil {
return nil, err
}
prepared := &PreparedStatement{
Query: query,
SQLTypes: pinfo,
portalNames: make(map[string]struct{}),
}
switch len(stmts) {
case 0:
return prepared, nil
case 1:
// ignore
default:
return nil, errors.Errorf("expected 1 statement, but found %d", len(stmts))
}
stmt := stmts[0]
prepared.Type = stmt.StatementType()
if err = pinfo.ProcessPlaceholderAnnotations(stmt); err != nil {
return nil, err
}
protoTS, err := isAsOf(session, stmt, e.cfg.Clock.Now())
if err != nil {
return nil, err
}
// Prepare needs a transaction because it needs to retrieve db/table
// descriptors for type checking.
// TODO(andrei): is this OK? If we're preparing as part of a SQL txn, how do
// we check that they're reading descriptors consistent with the txn in which
// they'll be used?
txn := client.NewTxn(e.cfg.DB)
if err := txn.SetIsolation(session.DefaultIsolationLevel); err != nil {
panic(err)
}
txn.Proto().OrigTimestamp = e.cfg.Clock.Now()
planner := session.newPlanner(e, txn)
planner.semaCtx.Placeholders.SetTypes(pinfo)
planner.evalCtx.PrepareOnly = true
if protoTS != nil {
planner.avoidCachedDescriptors = true
SetTxnTimestamps(txn, *protoTS)
}
plan, err := planner.prepare(session.Ctx(), stmt)
if err != nil {
return nil, err
}
if plan == nil {
return prepared, nil
}
defer plan.Close(session.Ctx())
prepared.Columns = plan.Columns()
for _, c := range prepared.Columns {
if err := checkResultType(c.Typ); err != nil {
return nil, err
}
}
return prepared, nil
}
// ExecuteStatements executes the given statement(s) and returns a response.
func (e *Executor) ExecuteStatements(
session *Session, stmts string, pinfo *parser.PlaceholderInfo,
) StatementResults {
session.resetForBatch(e)
session.phaseTimes[sessionStartBatch] = timeutil.Now()
defer func() {
if r := recover(); r != nil {
// On a panic, prepend the executed SQL.
panic(fmt.Errorf("%s: %s", stmts, r))
}
}()
// Send the Request for SQL execution and set the application-level error
// for each result in the reply.
return e.execRequest(session, stmts, pinfo, copyMsgNone)
}
// CopyData adds data to the COPY buffer and executes if there are enough rows.
func (e *Executor) CopyData(session *Session, data string) StatementResults {
return e.execRequest(session, data, nil, copyMsgData)
}
// CopyDone executes the buffered COPY data.
func (e *Executor) CopyDone(session *Session) StatementResults {
return e.execRequest(session, "", nil, copyMsgDone)
}
// CopyEnd ends the COPY mode. Any buffered data is discarded.
func (session *Session) CopyEnd(ctx context.Context) {
session.copyFrom.Close(ctx)
session.copyFrom = nil
}
// blockConfigUpdates blocks any gossip updates to the system config
// until the unlock function returned is called. Useful in tests.
func (e *Executor) blockConfigUpdates() func() {
e.systemConfigCond.L.Lock()
return func() {
e.systemConfigCond.L.Unlock()
}
}
// blockConfigUpdatesMaybe will ask the Executor to block config updates,
// so that checkTestingVerifyMetadataInitialOrDie() can later be run.
// The point is to lock the system config so that no gossip updates sneak in
// under us, so that we're able to assert that the verify callback only succeeds
// after a gossip update.
//
// It returns an unblock function which can be called after
// checkTestingVerifyMetadata{Initial}OrDie() has been called.
//
// This lock does not change semantics. Even outside of tests, the Executor uses
// static systemConfig for a user request, so locking the Executor's
// systemConfig cannot change the semantics of the SQL operation being performed
// under lock.
func (e *Executor) blockConfigUpdatesMaybe() func() {
if !e.cfg.TestingKnobs.WaitForGossipUpdate {
return func() {}
}
return e.blockConfigUpdates()
}
// waitForConfigUpdate blocks the caller until a new SystemConfig is received
// via gossip. This can only be called after blockConfigUpdates().
func (e *Executor) waitForConfigUpdate() {
e.systemConfigCond.Wait()
}
// execRequest executes the request in the provided Session.
// It parses the sql into statements, iterates through the statements, creates
// KV transactions and automatically retries them when possible, and executes
// the (synchronous attempt of) schema changes.
// It will accumulate a result in Response for each statement.
// It will resume a SQL transaction, if one was previously open for this client.
//
// execRequest handles the mismatch between the SQL interface that the Executor
// provides, based on statements being streamed from the client in the context
// of a session, and the KV client.Txn interface, based on (possibly-retriable)
// callbacks passed to be executed in the context of a transaction. Actual
// execution of statements in the context of a KV txn is delegated to
// runTxnAttempt().
func (e *Executor) execRequest(
session *Session, sql string, pinfo *parser.PlaceholderInfo, copymsg copyMsg,
) StatementResults {
var res StatementResults
var stmts parser.StatementList
var avoidCachedDescriptors bool
var err error
txnState := &session.TxnState
if log.V(2) {
log.Infof(session.Ctx(), "execRequest: %s", sql)
}
session.phaseTimes[sessionStartParse] = timeutil.Now()
if session.copyFrom != nil {
stmts, err = session.ProcessCopyData(session.Ctx(), sql, copymsg)
} else if copymsg != copyMsgNone {
err = fmt.Errorf("unexpected copy command")
} else {
var parser parser.Parser
stmts, err = parser.Parse(sql, session.Syntax)
}
session.phaseTimes[sessionEndParse] = timeutil.Now()
if err != nil {
if log.V(2) {
log.Infof(session.Ctx(), "execRequest: error: %v", err)
}
// A parse error occurred: we can't determine if there were multiple
// statements or only one, so just pretend there was one.
if txnState.txn != nil {
// Rollback the txn.
txnState.updateStateAndCleanupOnErr(err, e)
}
res.ResultList = append(res.ResultList, Result{Err: err})
return res
}
if len(stmts) == 0 {
res.Empty = true
return res
}
// If the Executor wants config updates to be blocked, then block them.
defer e.blockConfigUpdatesMaybe()()
for len(stmts) > 0 {
// Each iteration consumes a transaction's worth of statements.
inTxn := txnState.State != NoTxn
execOpt := client.TxnExecOptions{
AssignTimestampImmediately: true,
}
// Figure out the statements out of which we're going to try to consume
// this iteration. If we need to create an implicit txn, only one statement
// can be consumed.
stmtsToExec := stmts
// If protoTS is set, the transaction proto sets its Orig and Max timestamps
// to it each retry.
var protoTS *hlc.Timestamp
// We can AutoRetry the next batch of statements if we're in a clean state
// (i.e. the next statements we're going to see are the first statements in
// a transaction).
if !inTxn {
// Detect implicit transactions.
if _, isBegin := stmts[0].(*parser.BeginTransaction); !isBegin {
execOpt.AutoCommit = true
stmtsToExec = stmtsToExec[:1]
// Check for AS OF SYSTEM TIME. If it is present but not detected here,
// it will raise an error later on.
protoTS, err = isAsOf(session, stmtsToExec[0], e.cfg.Clock.Now())
if err != nil {
res.ResultList = append(res.ResultList, Result{Err: err})
return res
}
if protoTS != nil {
// When running AS OF SYSTEM TIME queries, we want to use the
// table descriptors from the specified time, and never lease
// anything. To do this, we pass down the avoidCachedDescriptors
// flag and set the transaction's timestamp to the specified time.
avoidCachedDescriptors = true
}
}
txnState.resetForNewSQLTxn(e, session)
txnState.autoRetry = true
txnState.sqlTimestamp = e.cfg.Clock.PhysicalTime()
if execOpt.AutoCommit {
txnState.txn.SetDebugName(sqlImplicitTxnName)
} else {
txnState.txn.SetDebugName(sqlTxnName)
}
} else {
txnState.autoRetry = false
}
execOpt.AutoRetry = txnState.autoRetry
if txnState.State == NoTxn {
panic("we failed to initialize a txn")
}
// Now actually run some statements.
var remainingStmts parser.StatementList
var results []Result
origState := txnState.State
// Track if we are retrying this query, so that we do not double count.
automaticRetryCount := 0
schemaChangerCount := len(txnState.schemaChangers.schemaChangers)
txnClosure := func(ctx context.Context, txn *client.Txn, opt *client.TxnExecOptions) error {
defer func() { automaticRetryCount++ }()
if txnState.State == Open && txnState.txn != txn {
panic(fmt.Sprintf("closure wasn't called in the txn we set up for it."+
"\ntxnState.txn:%+v\ntxn:%+v\ntxnState:%+v", txnState.txn, txn, txnState))
}
txnState.txn = txn
// Remove all schema changers added by the closure.
if automaticRetryCount > 0 && len(txnState.schemaChangers.schemaChangers) > 0 {
txnState.schemaChangers.schemaChangers =
txnState.schemaChangers.schemaChangers[:schemaChangerCount]
}
if protoTS != nil {
SetTxnTimestamps(txnState.txn, *protoTS)
}
var err error
if results != nil {
// Some results were produced by a previous attempt. Discard them.
ResultList(results).Close(ctx)
}
results, remainingStmts, err = runTxnAttempt(
e, session, stmtsToExec, pinfo, origState, opt,
avoidCachedDescriptors, automaticRetryCount)
// TODO(andrei): Until #7881 fixed.
if err == nil && txnState.State == Aborted {
doWarn := true
if len(stmtsToExec) > 0 {
if _, ok := stmtsToExec[0].(*parser.ShowTransactionStatus); ok {
doWarn = false
}
}
if doWarn {
log.Errorf(ctx,
"7881: txnState is Aborted without an error propagating. stmtsToExec: %s, "+
"results: %+v, remainingStmts: %s, txnState: %+v", stmtsToExec, results,
remainingStmts, txnState)
}
}
return err
}
// This is where the magic happens - we ask db to run a KV txn and possibly retry it.
txn := txnState.txn // this might be nil if the txn was already aborted.
err := txn.Exec(session.Ctx(), execOpt, txnClosure)
if err != nil && len(results) > 0 {
// Set or override the error in the last result, if any.
// The error might have come from auto-commit, in which case it wasn't
// captured in a result. Or, we might have had a RetryableTxnError that
// got converted to a non-retryable error when the txn closure was done.
lastRes := &results[len(results)-1]
lastRes.Err = convertToErrWithPGCode(err)
}
if err != nil && log.V(2) {
log.Infof(session.Ctx(), "execRequest: error: %v", err)
}
// Update the Err field of the last result if the error was coming from
// auto commit. The error was generated outside of the txn closure, so it was not
// set in any result.
if err != nil {
if aErr, ok := err.(*client.AutoCommitError); ok {
// TODO(andrei): Until #7881 fixed.
{
if txnState.txn != nil {
log.Eventf(session.Ctx(), "executor got AutoCommitError: %s\n"+
"txn: %+v\nexecOpt.AutoRetry %t, execOpt.AutoCommit:%t, stmts %+v, remaining %+v",
aErr, txnState.txn.Proto(), execOpt.AutoRetry, execOpt.AutoCommit, stmts,
remainingStmts)
} else {
log.Errorf(session.Ctx(), "7881: AutoCommitError on nil txn: %s, "+
"txnState %+v, execOpt %+v, stmts %+v, remaining %+v, txn captured before executing batch: %+v",
aErr, txnState, execOpt, stmts, remainingStmts, txn)
txnState.sp.SetBaggageItem(keyFor7881Sample, "sample me please")
}
}
e.TxnAbortCount.Inc(1)
// TODO(andrei): Once 7881 is fixed, this should be
// txnState.txn.CleanupOnError().
txn.CleanupOnError(session.Ctx(), err)
}
}
// Sanity check about not leaving KV txns open on errors.
if err != nil && txnState.txn != nil && !txnState.txn.IsFinalized() {
if _, retryable := err.(*roachpb.RetryableTxnError); !retryable {
log.Fatalf(session.Ctx(), "got a non-retryable error but the KV "+
"transaction is not finalized. TxnState: %s, err: %s\n"+
"err:%+v\n\ntxn: %s", txnState.State, err, err, txnState.txn.Proto())
}
}
res.ResultList = append(res.ResultList, results...)
// Now make sense of the state we got into and update txnState.
if (txnState.State == RestartWait || txnState.State == Aborted) &&
txnState.commitSeen {
// A COMMIT got an error (retryable or not). Too bad, this txn is toast.
// After we return a result for COMMIT (with the COMMIT pgwire tag), the
// user can't send any more commands.
e.TxnAbortCount.Inc(1)
txn.CleanupOnError(session.Ctx(), err)
txnState.resetStateAndTxn(NoTxn)
}
if execOpt.AutoCommit {
// If execOpt.AutoCommit was set, then the txn no longer exists at this point.
txnState.resetStateAndTxn(NoTxn)
}
// If we're no longer in a transaction, finish the trace.
if txnState.State == NoTxn {
txnState.finishSQLTxn(session.context)
}
// If the txn is in any state but Open, exec the schema changes. They'll
// short-circuit themselves if the mutation that queued them has been
// rolled back from the table descriptor.
stmtsExecuted := stmts[:len(stmtsToExec)-len(remainingStmts)]
if txnState.State != Open {
session.checkTestingVerifyMetadataInitialOrDie(e, stmts)
session.checkTestingVerifyMetadataOrDie(e, stmtsExecuted)
// Exec the schema changers (if the txn rolled back, the schema changers
// will short-circuit because the corresponding descriptor mutation is not
// found).
session.leases.releaseLeases(session.Ctx())
txnState.schemaChangers.execSchemaChanges(session.Ctx(), e, session, res.ResultList)
} else {
// We're still in a txn, so we only check that the verifyMetadata callback
// fails the first time it's run. The gossip update that will make the
// callback succeed only happens when the txn is done.
session.checkTestingVerifyMetadataInitialOrDie(e, stmtsExecuted)
}
// Figure out what statements to run on the next iteration.
if err != nil {
// Don't execute anything further.
stmts = nil
} else if execOpt.AutoCommit {
stmts = stmts[1:]
} else {
stmts = remainingStmts
}
}
return res
}
// If the plan has a fast path we attempt to query that,
// otherwise we fall back to counting via plan.Next().
func countRowsAffected(ctx context.Context, p planNode) (int, error) {
if a, ok := p.(planNodeFastPath); ok {
if count, res := a.FastPathResults(); res {
return count, nil
}
}
count := 0
next, err := p.Next(ctx)
for ; next; next, err = p.Next(ctx) {
count++
}
return count, err
}
// runTxnAttempt is used in the closure we pass to txn.Exec(). It
// will be called possibly multiple times (if opt.AutoRetry is set).
func runTxnAttempt(
e *Executor,
session *Session,
stmts parser.StatementList,
pinfo *parser.PlaceholderInfo,
origState TxnStateEnum,
opt *client.TxnExecOptions,
avoidCachedDescriptors bool,
automaticRetryCount int,
) ([]Result, parser.StatementList, error) {
// Ignore the state that might have been set by a previous try of this
// closure. By putting these modifications to txnState behind the
// automaticRetryCount condition, we guarantee that no asynchronous
// statements are still executing and reading from the state. This
// means that no synchronization is necessary to prevent data races.
if automaticRetryCount > 0 {
session.TxnState.State = origState
session.TxnState.commitSeen = false
}
results, remainingStmts, err := e.execStmtsInCurrentTxn(
session, stmts, pinfo, opt.AutoCommit, /* implicitTxn */
opt.AutoRetry /* txnBeginning */, avoidCachedDescriptors, automaticRetryCount)
if opt.AutoCommit && len(remainingStmts) > 0 {
panic("implicit txn failed to execute all stmts")
}
return results, remainingStmts, err
}
// execStmtsInCurrentTxn consumes a prefix of stmts, namely the
// statements belonging to a single SQL transaction. It executes in
// the session's current transaction, which is assumed to exist.
//
// COMMIT/ROLLBACK statements can end the current transaction. If that happens,
// this method returns, and the remaining statements are returned.
//
// If an error occurs while executing a statement, the SQL txn will be
// considered aborted and subsequent statements will be discarded (they will
// not be executed, they will not be returned for future execution, they will
// not generate results). Note that this also includes COMMIT/ROLLBACK
// statements. Further note that errTransactionAborted is no exception -
// encountering it will discard subsequent statements. This means that, to
// recover from an aborted txn, a COMMIT/ROLLBACK statement needs to be the
// first one in stmts.
//
// Args:
// session: the session to execute the statement in.
// stmts: the semicolon-separated list of statements to execute.
// pinfo: the placeholders to use in the statements.
// implicitTxn: set if the current transaction was implicitly
// created by the system (i.e. the client sent the statement outside of
// a transaction).
// COMMIT/ROLLBACK statements are rejected if set. Also, the transaction
// might be auto-committed in this function.
// avoidCachedDescriptors: set if the statement execution should avoid
// using cached descriptors.
// automaticRetryCount: increases with each retry; 0 for the first attempt.
//
// Returns:
// - the list of results (one per executed statement).
// - the statements that haven't been executed because the transaction has
// been committed or rolled back. In returning an error, this will be nil.
// - the error encountered while executing statements, if any. If an error
// occurred, it corresponds to the last result returned. Subsequent statements
// have not been executed. Note that usually the error is not reflected in
// this last result; the caller is responsible copying it into the result
// after converting it adequately.
func (e *Executor) execStmtsInCurrentTxn(
session *Session,
stmts parser.StatementList,
pinfo *parser.PlaceholderInfo,
implicitTxn bool,
txnBeginning bool,
avoidCachedDescriptors bool,
automaticRetryCount int,
) ([]Result, parser.StatementList, error) {
var results []Result
txnState := &session.TxnState
if txnState.State == NoTxn {
panic("execStmtsInCurrentTransaction called outside of a txn")
}
for i, stmt := range stmts {
if log.V(2) || log.HasSpanOrEvent(session.Ctx()) {
log.VEventf(session.Ctx(), 2, "executing %d/%d: %s", i+1, len(stmts), stmt)
}
txnState.schemaChangers.curStatementIdx = i
var stmtStrBefore string
// TODO(nvanbenschoten) Constant literals can change their representation (1.0000 -> 1) when type checking,
// so we need to reconsider how this works.
if (e.cfg.TestingKnobs.CheckStmtStringChange && false) ||
(e.cfg.TestingKnobs.StatementFilter != nil) {
// We do "statement string change" if a StatementFilter is installed,
// because the StatementFilter relies on the textual representation of
// statements to not change from what the client sends.
stmtStrBefore = stmt.String()
}
var res Result
var err error
// Run SHOW TRANSACTION STATUS in a separate code path so it is
// always guaranteed to execute regardless of the current transaction state.
if _, ok := stmt.(*parser.ShowTransactionStatus); ok {
res, err = runShowTransactionState(session, implicitTxn)
} else {
switch txnState.State {
case Open:
res, err = e.execStmtInOpenTxn(
session, stmt, pinfo, implicitTxn, txnBeginning && (i == 0), /* firstInTxn */
avoidCachedDescriptors, automaticRetryCount)
case Aborted, RestartWait:
res, err = e.execStmtInAbortedTxn(session, stmt)
case CommitWait:
res, err = e.execStmtInCommitWaitTxn(session, stmt)
default:
panic(fmt.Sprintf("unexpected txn state: %s", txnState.State))
}
if (e.cfg.TestingKnobs.CheckStmtStringChange && false) ||
(e.cfg.TestingKnobs.StatementFilter != nil) {
if after := stmt.String(); after != stmtStrBefore {
panic(fmt.Sprintf("statement changed after exec; before:\n %s\nafter:\n %s",
stmtStrBefore, after))
}
}
}
if filter := e.cfg.TestingKnobs.StatementFilter; filter != nil {
filter(session.Ctx(), stmt.String(), &res)
}
results = append(results, res)
if err != nil {
// After an error happened, skip executing all the remaining statements
// in this batch. This is Postgres behavior, and it makes sense as the
// protocol doesn't let you return results after an error.
return results, nil, err
}
if txnState.State == NoTxn {
// If the transaction is done, return the remaining statements to
// be executed as a different group.
return results, stmts[i+1:], nil
}
}
// If we got here, we've managed to consume all statements and we're still in a txn.
return results, nil, nil
}
// runShowTransactionState returns the state of current transaction.
func runShowTransactionState(session *Session, implicitTxn bool) (Result, error) {
var result Result
result.PGTag = (*parser.Show)(nil).StatementTag()
result.Type = (*parser.Show)(nil).StatementType()
result.Columns = ResultColumns{{Name: "TRANSACTION STATUS", Typ: parser.TypeString}}
result.Rows = NewRowContainer(session.makeBoundAccount(), result.Columns, 0)
state := session.TxnState.State
if implicitTxn {