Skip to content

Commit c74c96a

Browse files
committed
sql: introduce connExecutor, the query execution orchestrator
Release note: None The connExecutor is the top dog that interfaces with a pgwire connection (through the clientComm and CommandResult interfaces), consumes a stream of queries and produces a stream of results. It encapsulates the connection state machine for which it produces events. It interfaces with the two SQL execution engines for actually running queries. Its main responsibility is to dispatch statements based on the current state (in txn, not in txn, in aborted txn, etc) and to handle execution in all states but Open (in Open it talks to an execution engine). It also handles other commands than executiong queries: preparing, binding, etc and it maintains the session state associated with prepares statements.
1 parent 4747d30 commit c74c96a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+5380
-748
lines changed

pkg/cli/cliflags/flags.go

+6
Original file line numberDiff line numberDiff line change
@@ -690,4 +690,10 @@ in the history of the cluster.`,
690690
When no node ID is specified, also lists all nodes that have been decommissioned
691691
in the history of the cluster.`,
692692
}
693+
694+
// !!!
695+
UseFrontendV2 = FlagInfo{
696+
Name: "use-frontend-v2",
697+
Description: CertsDir.Description,
698+
}
693699
)

pkg/cli/flags.go

+1
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ func init() {
211211

212212
VarFlag(f, &serverCfg.Stores, cliflags.Store)
213213
VarFlag(f, &serverCfg.MaxOffset, cliflags.MaxOffset)
214+
BoolFlag(f, &serverCfg.UseFrontendV2, cliflags.UseFrontendV2, serverCfg.UseFrontendV2)
214215

215216
// Usage for the unix socket is odd as we use a real file, whereas
216217
// postgresql and clients consider it a directory and build a filename

pkg/gossip/gossip.go

+1
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,7 @@ func (g *Gossip) GetSystemConfig() (config.SystemConfig, bool) {
959959
// RegisterSystemConfigChannel registers a channel to signify updates for the
960960
// system config. It is notified after registration, and whenever a new
961961
// system config is successfully unmarshaled.
962+
// !!! comment
962963
func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
963964
g.systemConfigMu.Lock()
964965
defer g.systemConfigMu.Unlock()

pkg/internal/client/txn.go

+1
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,7 @@ func (txn *Txn) Exec(
797797
// book-keeping.
798798
//
799799
// TODO(andrei): I think this is called in the wrong place. See #18170.
800+
// !!! still used?
800801
func (txn *Txn) PrepareForRetry(ctx context.Context, err error) {
801802
txn.commitTriggers = nil
802803
log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s",

pkg/server/admin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (s *adminServer) NewContextAndSessionForRPC(
157157
) (context.Context, *sql.Session) {
158158
ctx = s.server.AnnotateCtx(ctx)
159159
session := sql.NewSession(
160-
ctx, args, s.server.sqlExecutor, nil /* remote */, s.memMetrics, nil /* conn */)
160+
ctx, args, s.server.sqlExecutor, s.memMetrics, nil /* conn */)
161161
session.StartMonitor(&s.memMonitor, mon.BoundAccount{})
162162
return ctx, session
163163
}

pkg/server/admin_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func TestAdminAPIDatabases(t *testing.T) {
240240
const testdb = "test"
241241
session := sql.NewSession(
242242
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
243-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
243+
&sql.MemoryMetrics{}, nil /* conn */)
244244
session.StartUnlimitedMonitor()
245245
defer session.Finish(ts.sqlExecutor)
246246
query := "CREATE DATABASE " + testdb
@@ -412,7 +412,7 @@ func TestAdminAPITableDetails(t *testing.T) {
412412

413413
session := sql.NewSession(
414414
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
415-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
415+
&sql.MemoryMetrics{}, nil /* conn */)
416416
session.StartUnlimitedMonitor()
417417
defer session.Finish(ts.sqlExecutor)
418418
setupQueries := []string{
@@ -557,7 +557,7 @@ func TestAdminAPIZoneDetails(t *testing.T) {
557557
defer span.Finish()
558558
session := sql.NewSession(
559559
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
560-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
560+
&sql.MemoryMetrics{}, nil /* conn */)
561561
session.StartUnlimitedMonitor()
562562
setupQueries := []string{
563563
"CREATE DATABASE test",
@@ -668,7 +668,7 @@ func TestAdminAPIUsers(t *testing.T) {
668668
defer span.Finish()
669669
session := sql.NewSession(
670670
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
671-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
671+
&sql.MemoryMetrics{}, nil /* conn */)
672672
session.StartUnlimitedMonitor()
673673
defer session.Finish(ts.sqlExecutor)
674674
query := `
@@ -713,7 +713,7 @@ func TestAdminAPIEvents(t *testing.T) {
713713
defer span.Finish()
714714
session := sql.NewSession(
715715
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
716-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
716+
&sql.MemoryMetrics{}, nil /* conn */)
717717
session.StartUnlimitedMonitor()
718718
defer session.Finish(ts.sqlExecutor)
719719
setupQueries := []string{

pkg/server/authentication.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (s *authenticationServer) verifyPassword(
238238
ctx context.Context, username string, password string,
239239
) (bool, error) {
240240
exists, hashedPassword, err := sql.GetUserHashedPassword(
241-
ctx, s.server.sqlExecutor, s.memMetrics, username,
241+
ctx, s.server.execCfg, s.memMetrics, username,
242242
)
243243
if err != nil {
244244
return false, err

pkg/server/config.go

+5
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const (
7171
TempDirsRecordFilename = "temp-dirs-record.txt"
7272
defaultEventLogEnabled = true
7373
defaultEnableWebSessionAuthentication = false
74+
defaultUseFrontendV2 = true
7475

7576
maximumMaxClockOffset = 5 * time.Second
7677

@@ -234,6 +235,9 @@ type Config struct {
234235
// the Admin API's HTTP endpoints.
235236
EnableWebSessionAuthentication bool
236237

238+
// !!!
239+
UseFrontendV2 bool
240+
237241
enginesCreated bool
238242
}
239243

@@ -366,6 +370,7 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config {
366370
ScanMaxIdleTime: defaultScanMaxIdleTime,
367371
EventLogEnabled: defaultEventLogEnabled,
368372
EnableWebSessionAuthentication: defaultEnableWebSessionAuthentication,
373+
UseFrontendV2: defaultUseFrontendV2,
369374
Stores: base.StoreSpecList{
370375
Specs: []base.StoreSpec{storeSpec},
371376
},

pkg/server/server.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
521521
execCfg.EvalContextTestingKnobs = *sqlEvalContext.(*tree.EvalContextTestingKnobs)
522522
}
523523
s.sqlExecutor = sql.NewExecutor(execCfg, s.stopper)
524-
s.registry.AddMetricStruct(s.sqlExecutor)
524+
if !s.cfg.UseFrontendV2 {
525+
s.registry.AddMetricStruct(s.sqlExecutor)
526+
}
525527

526528
s.pgServer = pgwire.MakeServer(
527529
s.cfg.AmbientCtx,
@@ -531,8 +533,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
531533
&s.internalMemMetrics,
532534
&rootSQLMemoryMonitor,
533535
s.cfg.HistogramWindowInterval(),
536+
&execCfg,
534537
)
535538
s.registry.AddMetricStruct(s.pgServer.Metrics())
539+
if s.cfg.UseFrontendV2 {
540+
s.registry.AddMetricStruct(s.pgServer.StatementCounters())
541+
s.registry.AddMetricStruct(s.pgServer.EngineMetrics())
542+
}
536543

537544
sqlExecutor.ExecCfg = &execCfg
538545
s.execCfg = &execCfg
@@ -1166,6 +1173,7 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
11661173

11671174
s.sqlExecutor.Start(ctx, s.execCfg.DistSQLPlanner)
11681175
s.distSQLServer.Start()
1176+
s.pgServer.Start(ctx, s.stopper)
11691177

11701178
s.serveMode.set(modeOperational)
11711179

@@ -1268,7 +1276,13 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
12681276
connCtx := log.WithLogTagStr(pgCtx, "client", conn.RemoteAddr().String())
12691277
setTCPKeepAlive(connCtx, conn)
12701278

1271-
if err := s.pgServer.ServeConn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
1279+
var serveFn func(ctx context.Context, conn net.Conn) error
1280+
if s.cfg.UseFrontendV2 {
1281+
serveFn = s.pgServer.ServeConn2
1282+
} else {
1283+
serveFn = s.pgServer.ServeConn
1284+
}
1285+
if err := serveFn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
12721286
// Report the error on this connection's context, so that we
12731287
// know which remote client caused the error when looking at
12741288
// the logs.
@@ -1340,6 +1354,7 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
13401354
func (s *Server) doDrain(
13411355
ctx context.Context, modes []serverpb.DrainMode, setTo bool,
13421356
) ([]serverpb.DrainMode, error) {
1357+
log.Infof(ctx, "!!! Server.doDrain")
13431358
for _, mode := range modes {
13441359
switch mode {
13451360
case serverpb.DrainMode_CLIENT:

pkg/server/updates.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,13 @@ func (s *Server) maybeReportDiagnostics(
242242
if log.DiagnosticsReportingEnabled.Get(&s.st.SV) && diagnosticsMetricsEnabled.Get(&s.st.SV) {
243243
s.reportDiagnostics(running)
244244
}
245-
s.sqlExecutor.ResetStatementStats(ctx)
246-
s.sqlExecutor.ResetUnimplementedCounts()
245+
if s.cfg.UseFrontendV2 {
246+
s.pgServer.SQLServer.ResetStatementStats(ctx)
247+
s.pgServer.SQLServer.ResetUnimplementedCounts()
248+
} else {
249+
s.sqlExecutor.ResetStatementStats(ctx)
250+
s.sqlExecutor.ResetUnimplementedCounts()
251+
}
247252

248253
return scheduled.Add(diagnosticReportFrequency.Get(&s.st.SV))
249254
}
@@ -280,7 +285,6 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
280285
schema = nil
281286
}
282287
info.Schema = schema
283-
info.SqlStats = s.sqlExecutor.GetScrubbedStmtStats()
284288
info.UnimplementedErrors = make(map[string]int64)
285289

286290
// Read the system.settings table to determine the settings for which we have
@@ -318,7 +322,15 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
318322
}
319323
}
320324

321-
s.sqlExecutor.FillUnimplementedErrorCounts(info.UnimplementedErrors)
325+
if s.cfg.UseFrontendV2 {
326+
log.Infof(context.TODO(), "!!! server asking v2 for stats")
327+
info.SqlStats = s.pgServer.SQLServer.GetScrubbedStmtStats()
328+
s.pgServer.SQLServer.FillUnimplementedErrorCounts(info.UnimplementedErrors)
329+
} else {
330+
log.Infof(context.TODO(), "!!! server asking v1 for stats")
331+
info.SqlStats = s.sqlExecutor.GetScrubbedStmtStats()
332+
s.sqlExecutor.FillUnimplementedErrorCounts(info.UnimplementedErrors)
333+
}
322334
return &info
323335
}
324336

pkg/sql/app_stats.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func anonymizeStmt(stmt Statement) string {
165165
}
166166

167167
// sqlStats carries per-application statistics for all applications on
168-
// each node. It hangs off Executor.
168+
// each node.
169169
type sqlStats struct {
170170
st *cluster.Settings
171171
syncutil.Mutex
@@ -281,14 +281,20 @@ func scrubStmtStatKey(vt VirtualTabler, key string) (string, bool) {
281281
// queries scrubbed of their identifiers. Any statements which cannot be
282282
// scrubbed will be omitted from the returned map.
283283
func (e *Executor) GetScrubbedStmtStats() []roachpb.CollectedStatementStatistics {
284+
return e.sqlStats.getScrubbedStmtStats(e.cfg.VirtualSchemas)
285+
}
286+
287+
func (s *sqlStats) getScrubbedStmtStats(
288+
vt *VirtualSchemaHolder,
289+
) []roachpb.CollectedStatementStatistics {
290+
s.Lock()
291+
defer s.Unlock()
284292
var ret []roachpb.CollectedStatementStatistics
285-
vt := e.cfg.VirtualSchemas
286-
e.sqlStats.Lock()
287-
salt := ClusterSecret.Get(&e.cfg.Settings.SV)
288-
for appName, a := range e.sqlStats.apps {
293+
salt := ClusterSecret.Get(&s.st.SV)
294+
for appName, a := range s.apps {
289295
if cap(ret) == 0 {
290296
// guesstimate that we'll need apps*(queries-per-app).
291-
ret = make([]roachpb.CollectedStatementStatistics, 0, len(a.stmts)*len(e.sqlStats.apps))
297+
ret = make([]roachpb.CollectedStatementStatistics, 0, len(a.stmts)*len(s.apps))
292298
}
293299
hashedAppName := HashForReporting(salt, appName)
294300
a.Lock()
@@ -315,7 +321,6 @@ func (e *Executor) GetScrubbedStmtStats() []roachpb.CollectedStatementStatistics
315321
}
316322
a.Unlock()
317323
}
318-
e.sqlStats.Unlock()
319324
return ret
320325
}
321326

pkg/sql/builtin_mem_usage_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3030
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
3131
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
32+
"github.com/cockroachdb/cockroach/pkg/util/log"
3233
)
3334

3435
// lowMemoryBudget is the memory budget used to test builtins are recording
@@ -59,6 +60,7 @@ CREATE TABLE d.t (a STRING)
5960
}
6061

6162
for i := 0; i < numRows; i++ {
63+
log.Infof(context.TODO(), "!!! test sending insert: %d", i)
6264
if _, err := sqlDB.Exec(`INSERT INTO d.t VALUES (REPEAT('a', $1))`, rowSize); err != nil {
6365
return err
6466
}

pkg/sql/cancel_query.go

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
2424
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2525
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
26+
"github.com/cockroachdb/cockroach/pkg/util/log"
2627
"github.com/cockroachdb/cockroach/pkg/util/uint128"
2728
)
2829

@@ -56,6 +57,7 @@ func (n *cancelQueryNode) startExec(params runParams) error {
5657
if err != nil {
5758
return err
5859
}
60+
log.Infof(params.ctx, "!!! cancel queryID: %s", queryIDDatum)
5961

6062
queryIDString := tree.AsStringWithFlags(queryIDDatum, tree.FmtBareStrings)
6163
queryID, err := uint128.FromString(queryIDString)

0 commit comments

Comments
 (0)