Skip to content

Commit 7e38855

Browse files
committed
sql: introduce connExecutor, the query execution orchestrator
Release note: None The connExecutor is the top dog that interfaces with a pgwire connection (through the clientComm and CommandResult interfaces), consumes a stream of queries and produces a stream of results. It encapsulates the connection state machine for which it produces events. It interfaces with the two SQL execution engines for actually running queries. Its main responsibility is to dispatch statements based on the current state (in txn, not in txn, in aborted txn, etc) and to handle execution in all states but Open (in Open it talks to an execution engine). It also handles other commands than executiong queries: preparing, binding, etc and it maintains the session state associated with prepares statements.
1 parent f9f3d43 commit 7e38855

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+5444
-931
lines changed

pkg/gossip/gossip.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -957,8 +957,8 @@ func (g *Gossip) GetSystemConfig() (config.SystemConfig, bool) {
957957
}
958958

959959
// RegisterSystemConfigChannel registers a channel to signify updates for the
960-
// system config. It is notified after registration, and whenever a new
961-
// system config is successfully unmarshaled.
960+
// system config. It is notified after registration (if a system config is
961+
// already set), and whenever a new system config is successfully unmarshaled.
962962
func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
963963
g.systemConfigMu.Lock()
964964
defer g.systemConfigMu.Unlock()

pkg/server/admin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (s *adminServer) NewContextAndSessionForRPC(
157157
) (context.Context, *sql.Session) {
158158
ctx = s.server.AnnotateCtx(ctx)
159159
session := sql.NewSession(
160-
ctx, args, s.server.sqlExecutor, nil /* remote */, s.memMetrics, nil /* conn */)
160+
ctx, args, s.server.sqlExecutor, s.memMetrics, nil /* conn */)
161161
session.StartMonitor(&s.memMonitor, mon.BoundAccount{})
162162
return ctx, session
163163
}

pkg/server/admin_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func TestAdminAPIDatabases(t *testing.T) {
240240
const testdb = "test"
241241
session := sql.NewSession(
242242
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
243-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
243+
&sql.MemoryMetrics{}, nil /* conn */)
244244
session.StartUnlimitedMonitor()
245245
defer session.Finish(ts.sqlExecutor)
246246
query := "CREATE DATABASE " + testdb
@@ -412,7 +412,7 @@ func TestAdminAPITableDetails(t *testing.T) {
412412

413413
session := sql.NewSession(
414414
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
415-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
415+
&sql.MemoryMetrics{}, nil /* conn */)
416416
session.StartUnlimitedMonitor()
417417
defer session.Finish(ts.sqlExecutor)
418418
setupQueries := []string{
@@ -557,7 +557,7 @@ func TestAdminAPIZoneDetails(t *testing.T) {
557557
defer span.Finish()
558558
session := sql.NewSession(
559559
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
560-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
560+
&sql.MemoryMetrics{}, nil /* conn */)
561561
session.StartUnlimitedMonitor()
562562
setupQueries := []string{
563563
"CREATE DATABASE test",
@@ -668,7 +668,7 @@ func TestAdminAPIUsers(t *testing.T) {
668668
defer span.Finish()
669669
session := sql.NewSession(
670670
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
671-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
671+
&sql.MemoryMetrics{}, nil /* conn */)
672672
session.StartUnlimitedMonitor()
673673
defer session.Finish(ts.sqlExecutor)
674674
query := `
@@ -713,7 +713,7 @@ func TestAdminAPIEvents(t *testing.T) {
713713
defer span.Finish()
714714
session := sql.NewSession(
715715
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
716-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
716+
&sql.MemoryMetrics{}, nil /* conn */)
717717
session.StartUnlimitedMonitor()
718718
defer session.Finish(ts.sqlExecutor)
719719
setupQueries := []string{

pkg/server/authentication.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (s *authenticationServer) verifyPassword(
238238
ctx context.Context, username string, password string,
239239
) (bool, error) {
240240
exists, hashedPassword, err := sql.GetUserHashedPassword(
241-
ctx, s.server.sqlExecutor, s.memMetrics, username,
241+
ctx, s.server.execCfg, s.memMetrics, username,
242242
)
243243
if err != nil {
244244
return false, err

pkg/server/config.go

+10
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,13 @@ type Config struct {
234234
// the Admin API's HTTP endpoints.
235235
EnableWebSessionAuthentication bool
236236

237+
// UseLegacyConnHandling, if set, makes the Server use the old code for
238+
// handling pgwire connections.
239+
//
240+
// TODO(andrei): remove this once the code for the old v3Conn and Executor is
241+
// deleted.
242+
UseLegacyConnHandling bool
243+
237244
enginesCreated bool
238245
}
239246

@@ -377,6 +384,7 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config {
377384
cfg.Config.InitDefaults()
378385
cfg.RaftConfig.SetDefaults()
379386
cfg.LeaseManagerConfig = base.NewLeaseManagerConfig()
387+
380388
return cfg
381389
}
382390

@@ -589,6 +597,8 @@ func (cfg *Config) readEnvironmentVariables() {
589597
cfg.Linearizable = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_LINEARIZABLE", cfg.Linearizable)
590598
cfg.ScanInterval = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_INTERVAL", cfg.ScanInterval)
591599
cfg.ScanMaxIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MAX_IDLE_TIME", cfg.ScanMaxIdleTime)
600+
cfg.UseLegacyConnHandling = envutil.EnvOrDefaultBool(
601+
"COCKROACH_USE_LEGACY_CONN_HANDLING", false)
592602
}
593603

594604
// parseGossipBootstrapResolvers parses list of gossip bootstrap resolvers.

pkg/server/server.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
521521
execCfg.EvalContextTestingKnobs = *sqlEvalContext.(*tree.EvalContextTestingKnobs)
522522
}
523523
s.sqlExecutor = sql.NewExecutor(execCfg, s.stopper)
524-
s.registry.AddMetricStruct(s.sqlExecutor)
524+
if s.cfg.UseLegacyConnHandling {
525+
s.registry.AddMetricStruct(s.sqlExecutor)
526+
}
525527

526528
s.pgServer = pgwire.MakeServer(
527529
s.cfg.AmbientCtx,
@@ -531,8 +533,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
531533
&s.internalMemMetrics,
532534
&rootSQLMemoryMonitor,
533535
s.cfg.HistogramWindowInterval(),
536+
&execCfg,
534537
)
535538
s.registry.AddMetricStruct(s.pgServer.Metrics())
539+
if !s.cfg.UseLegacyConnHandling {
540+
s.registry.AddMetricStruct(s.pgServer.StatementCounters())
541+
s.registry.AddMetricStruct(s.pgServer.EngineMetrics())
542+
}
536543

537544
sqlExecutor.ExecCfg = &execCfg
538545
s.execCfg = &execCfg
@@ -1166,6 +1173,7 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
11661173

11671174
s.sqlExecutor.Start(ctx, s.execCfg.DistSQLPlanner)
11681175
s.distSQLServer.Start()
1176+
s.pgServer.Start(ctx, s.stopper)
11691177

11701178
s.serveMode.set(modeOperational)
11711179

@@ -1268,7 +1276,13 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
12681276
connCtx := log.WithLogTagStr(pgCtx, "client", conn.RemoteAddr().String())
12691277
setTCPKeepAlive(connCtx, conn)
12701278

1271-
if err := s.pgServer.ServeConn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
1279+
var serveFn func(ctx context.Context, conn net.Conn) error
1280+
if !s.cfg.UseLegacyConnHandling {
1281+
serveFn = s.pgServer.ServeConn2
1282+
} else {
1283+
serveFn = s.pgServer.ServeConn
1284+
}
1285+
if err := serveFn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
12721286
// Report the error on this connection's context, so that we
12731287
// know which remote client caused the error when looking at
12741288
// the logs.

pkg/server/updates.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,13 @@ func (s *Server) maybeReportDiagnostics(
242242
if log.DiagnosticsReportingEnabled.Get(&s.st.SV) && diagnosticsMetricsEnabled.Get(&s.st.SV) {
243243
s.reportDiagnostics(running)
244244
}
245-
s.sqlExecutor.ResetStatementStats(ctx)
246-
s.sqlExecutor.ResetUnimplementedCounts()
245+
if !s.cfg.UseLegacyConnHandling {
246+
s.pgServer.SQLServer.ResetStatementStats(ctx)
247+
s.pgServer.SQLServer.ResetUnimplementedCounts()
248+
} else {
249+
s.sqlExecutor.ResetStatementStats(ctx)
250+
s.sqlExecutor.ResetUnimplementedCounts()
251+
}
247252

248253
return scheduled.Add(diagnosticReportFrequency.Get(&s.st.SV))
249254
}
@@ -280,7 +285,6 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
280285
schema = nil
281286
}
282287
info.Schema = schema
283-
info.SqlStats = s.sqlExecutor.GetScrubbedStmtStats()
284288
info.UnimplementedErrors = make(map[string]int64)
285289

286290
// Read the system.settings table to determine the settings for which we have
@@ -318,7 +322,13 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
318322
}
319323
}
320324

321-
s.sqlExecutor.FillUnimplementedErrorCounts(info.UnimplementedErrors)
325+
if !s.cfg.UseLegacyConnHandling {
326+
info.SqlStats = s.pgServer.SQLServer.GetScrubbedStmtStats()
327+
s.pgServer.SQLServer.FillUnimplementedErrorCounts(info.UnimplementedErrors)
328+
} else {
329+
info.SqlStats = s.sqlExecutor.GetScrubbedStmtStats()
330+
s.sqlExecutor.FillUnimplementedErrorCounts(info.UnimplementedErrors)
331+
}
322332
return &info
323333
}
324334

pkg/sql/app_stats.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func anonymizeStmt(stmt Statement) string {
165165
}
166166

167167
// sqlStats carries per-application statistics for all applications on
168-
// each node. It hangs off Executor.
168+
// each node.
169169
type sqlStats struct {
170170
st *cluster.Settings
171171
syncutil.Mutex
@@ -281,14 +281,20 @@ func scrubStmtStatKey(vt VirtualTabler, key string) (string, bool) {
281281
// queries scrubbed of their identifiers. Any statements which cannot be
282282
// scrubbed will be omitted from the returned map.
283283
func (e *Executor) GetScrubbedStmtStats() []roachpb.CollectedStatementStatistics {
284+
return e.sqlStats.getScrubbedStmtStats(e.cfg.VirtualSchemas)
285+
}
286+
287+
func (s *sqlStats) getScrubbedStmtStats(
288+
vt *VirtualSchemaHolder,
289+
) []roachpb.CollectedStatementStatistics {
290+
s.Lock()
291+
defer s.Unlock()
284292
var ret []roachpb.CollectedStatementStatistics
285-
vt := e.cfg.VirtualSchemas
286-
e.sqlStats.Lock()
287-
salt := ClusterSecret.Get(&e.cfg.Settings.SV)
288-
for appName, a := range e.sqlStats.apps {
293+
salt := ClusterSecret.Get(&s.st.SV)
294+
for appName, a := range s.apps {
289295
if cap(ret) == 0 {
290296
// guesstimate that we'll need apps*(queries-per-app).
291-
ret = make([]roachpb.CollectedStatementStatistics, 0, len(a.stmts)*len(e.sqlStats.apps))
297+
ret = make([]roachpb.CollectedStatementStatistics, 0, len(a.stmts)*len(s.apps))
292298
}
293299
hashedAppName := HashForReporting(salt, appName)
294300
a.Lock()
@@ -315,7 +321,6 @@ func (e *Executor) GetScrubbedStmtStats() []roachpb.CollectedStatementStatistics
315321
}
316322
a.Unlock()
317323
}
318-
e.sqlStats.Unlock()
319324
return ret
320325
}
321326

0 commit comments

Comments
 (0)