Skip to content

Commit f551c46

Browse files
authored
Merge pull request #22277 from andreimatei/executor-state-machine-conn-executor
sql: introduce connExecutor, the query execution orchestrator
2 parents 0286431 + 7e38855 commit f551c46

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+5444
-931
lines changed

pkg/gossip/gossip.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -957,8 +957,8 @@ func (g *Gossip) GetSystemConfig() (config.SystemConfig, bool) {
957957
}
958958

959959
// RegisterSystemConfigChannel registers a channel to signify updates for the
960-
// system config. It is notified after registration, and whenever a new
961-
// system config is successfully unmarshaled.
960+
// system config. It is notified after registration (if a system config is
961+
// already set), and whenever a new system config is successfully unmarshaled.
962962
func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
963963
g.systemConfigMu.Lock()
964964
defer g.systemConfigMu.Unlock()

pkg/server/admin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (s *adminServer) NewContextAndSessionForRPC(
157157
) (context.Context, *sql.Session) {
158158
ctx = s.server.AnnotateCtx(ctx)
159159
session := sql.NewSession(
160-
ctx, args, s.server.sqlExecutor, nil /* remote */, s.memMetrics, nil /* conn */)
160+
ctx, args, s.server.sqlExecutor, s.memMetrics, nil /* conn */)
161161
session.StartMonitor(&s.memMonitor, mon.BoundAccount{})
162162
return ctx, session
163163
}

pkg/server/admin_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func TestAdminAPIDatabases(t *testing.T) {
240240
const testdb = "test"
241241
session := sql.NewSession(
242242
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
243-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
243+
&sql.MemoryMetrics{}, nil /* conn */)
244244
session.StartUnlimitedMonitor()
245245
defer session.Finish(ts.sqlExecutor)
246246
query := "CREATE DATABASE " + testdb
@@ -412,7 +412,7 @@ func TestAdminAPITableDetails(t *testing.T) {
412412

413413
session := sql.NewSession(
414414
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
415-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
415+
&sql.MemoryMetrics{}, nil /* conn */)
416416
session.StartUnlimitedMonitor()
417417
defer session.Finish(ts.sqlExecutor)
418418
setupQueries := []string{
@@ -557,7 +557,7 @@ func TestAdminAPIZoneDetails(t *testing.T) {
557557
defer span.Finish()
558558
session := sql.NewSession(
559559
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
560-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
560+
&sql.MemoryMetrics{}, nil /* conn */)
561561
session.StartUnlimitedMonitor()
562562
setupQueries := []string{
563563
"CREATE DATABASE test",
@@ -668,7 +668,7 @@ func TestAdminAPIUsers(t *testing.T) {
668668
defer span.Finish()
669669
session := sql.NewSession(
670670
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
671-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
671+
&sql.MemoryMetrics{}, nil /* conn */)
672672
session.StartUnlimitedMonitor()
673673
defer session.Finish(ts.sqlExecutor)
674674
query := `
@@ -713,7 +713,7 @@ func TestAdminAPIEvents(t *testing.T) {
713713
defer span.Finish()
714714
session := sql.NewSession(
715715
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
716-
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
716+
&sql.MemoryMetrics{}, nil /* conn */)
717717
session.StartUnlimitedMonitor()
718718
defer session.Finish(ts.sqlExecutor)
719719
setupQueries := []string{

pkg/server/authentication.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (s *authenticationServer) verifyPassword(
238238
ctx context.Context, username string, password string,
239239
) (bool, error) {
240240
exists, hashedPassword, err := sql.GetUserHashedPassword(
241-
ctx, s.server.sqlExecutor, s.memMetrics, username,
241+
ctx, s.server.execCfg, s.memMetrics, username,
242242
)
243243
if err != nil {
244244
return false, err

pkg/server/config.go

+10
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,13 @@ type Config struct {
234234
// the Admin API's HTTP endpoints.
235235
EnableWebSessionAuthentication bool
236236

237+
// UseLegacyConnHandling, if set, makes the Server use the old code for
238+
// handling pgwire connections.
239+
//
240+
// TODO(andrei): remove this once the code for the old v3Conn and Executor is
241+
// deleted.
242+
UseLegacyConnHandling bool
243+
237244
enginesCreated bool
238245
}
239246

@@ -377,6 +384,7 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config {
377384
cfg.Config.InitDefaults()
378385
cfg.RaftConfig.SetDefaults()
379386
cfg.LeaseManagerConfig = base.NewLeaseManagerConfig()
387+
380388
return cfg
381389
}
382390

@@ -589,6 +597,8 @@ func (cfg *Config) readEnvironmentVariables() {
589597
cfg.Linearizable = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_LINEARIZABLE", cfg.Linearizable)
590598
cfg.ScanInterval = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_INTERVAL", cfg.ScanInterval)
591599
cfg.ScanMaxIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MAX_IDLE_TIME", cfg.ScanMaxIdleTime)
600+
cfg.UseLegacyConnHandling = envutil.EnvOrDefaultBool(
601+
"COCKROACH_USE_LEGACY_CONN_HANDLING", false)
592602
}
593603

594604
// parseGossipBootstrapResolvers parses list of gossip bootstrap resolvers.

pkg/server/server.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
521521
execCfg.EvalContextTestingKnobs = *sqlEvalContext.(*tree.EvalContextTestingKnobs)
522522
}
523523
s.sqlExecutor = sql.NewExecutor(execCfg, s.stopper)
524-
s.registry.AddMetricStruct(s.sqlExecutor)
524+
if s.cfg.UseLegacyConnHandling {
525+
s.registry.AddMetricStruct(s.sqlExecutor)
526+
}
525527

526528
s.pgServer = pgwire.MakeServer(
527529
s.cfg.AmbientCtx,
@@ -531,8 +533,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
531533
&s.internalMemMetrics,
532534
&rootSQLMemoryMonitor,
533535
s.cfg.HistogramWindowInterval(),
536+
&execCfg,
534537
)
535538
s.registry.AddMetricStruct(s.pgServer.Metrics())
539+
if !s.cfg.UseLegacyConnHandling {
540+
s.registry.AddMetricStruct(s.pgServer.StatementCounters())
541+
s.registry.AddMetricStruct(s.pgServer.EngineMetrics())
542+
}
536543

537544
sqlExecutor.ExecCfg = &execCfg
538545
s.execCfg = &execCfg
@@ -1166,6 +1173,7 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
11661173

11671174
s.sqlExecutor.Start(ctx, s.execCfg.DistSQLPlanner)
11681175
s.distSQLServer.Start()
1176+
s.pgServer.Start(ctx, s.stopper)
11691177

11701178
s.serveMode.set(modeOperational)
11711179

@@ -1268,7 +1276,13 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
12681276
connCtx := log.WithLogTagStr(pgCtx, "client", conn.RemoteAddr().String())
12691277
setTCPKeepAlive(connCtx, conn)
12701278

1271-
if err := s.pgServer.ServeConn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
1279+
var serveFn func(ctx context.Context, conn net.Conn) error
1280+
if !s.cfg.UseLegacyConnHandling {
1281+
serveFn = s.pgServer.ServeConn2
1282+
} else {
1283+
serveFn = s.pgServer.ServeConn
1284+
}
1285+
if err := serveFn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
12721286
// Report the error on this connection's context, so that we
12731287
// know which remote client caused the error when looking at
12741288
// the logs.

pkg/server/updates.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,13 @@ func (s *Server) maybeReportDiagnostics(
242242
if log.DiagnosticsReportingEnabled.Get(&s.st.SV) && diagnosticsMetricsEnabled.Get(&s.st.SV) {
243243
s.reportDiagnostics(running)
244244
}
245-
s.sqlExecutor.ResetStatementStats(ctx)
246-
s.sqlExecutor.ResetUnimplementedCounts()
245+
if !s.cfg.UseLegacyConnHandling {
246+
s.pgServer.SQLServer.ResetStatementStats(ctx)
247+
s.pgServer.SQLServer.ResetUnimplementedCounts()
248+
} else {
249+
s.sqlExecutor.ResetStatementStats(ctx)
250+
s.sqlExecutor.ResetUnimplementedCounts()
251+
}
247252

248253
return scheduled.Add(diagnosticReportFrequency.Get(&s.st.SV))
249254
}
@@ -280,7 +285,6 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
280285
schema = nil
281286
}
282287
info.Schema = schema
283-
info.SqlStats = s.sqlExecutor.GetScrubbedStmtStats()
284288
info.UnimplementedErrors = make(map[string]int64)
285289

286290
// Read the system.settings table to determine the settings for which we have
@@ -318,7 +322,13 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
318322
}
319323
}
320324

321-
s.sqlExecutor.FillUnimplementedErrorCounts(info.UnimplementedErrors)
325+
if !s.cfg.UseLegacyConnHandling {
326+
info.SqlStats = s.pgServer.SQLServer.GetScrubbedStmtStats()
327+
s.pgServer.SQLServer.FillUnimplementedErrorCounts(info.UnimplementedErrors)
328+
} else {
329+
info.SqlStats = s.sqlExecutor.GetScrubbedStmtStats()
330+
s.sqlExecutor.FillUnimplementedErrorCounts(info.UnimplementedErrors)
331+
}
322332
return &info
323333
}
324334

pkg/sql/app_stats.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func anonymizeStmt(stmt Statement) string {
165165
}
166166

167167
// sqlStats carries per-application statistics for all applications on
168-
// each node. It hangs off Executor.
168+
// each node.
169169
type sqlStats struct {
170170
st *cluster.Settings
171171
syncutil.Mutex
@@ -281,14 +281,20 @@ func scrubStmtStatKey(vt VirtualTabler, key string) (string, bool) {
281281
// queries scrubbed of their identifiers. Any statements which cannot be
282282
// scrubbed will be omitted from the returned map.
283283
func (e *Executor) GetScrubbedStmtStats() []roachpb.CollectedStatementStatistics {
284+
return e.sqlStats.getScrubbedStmtStats(e.cfg.VirtualSchemas)
285+
}
286+
287+
func (s *sqlStats) getScrubbedStmtStats(
288+
vt *VirtualSchemaHolder,
289+
) []roachpb.CollectedStatementStatistics {
290+
s.Lock()
291+
defer s.Unlock()
284292
var ret []roachpb.CollectedStatementStatistics
285-
vt := e.cfg.VirtualSchemas
286-
e.sqlStats.Lock()
287-
salt := ClusterSecret.Get(&e.cfg.Settings.SV)
288-
for appName, a := range e.sqlStats.apps {
293+
salt := ClusterSecret.Get(&s.st.SV)
294+
for appName, a := range s.apps {
289295
if cap(ret) == 0 {
290296
// guesstimate that we'll need apps*(queries-per-app).
291-
ret = make([]roachpb.CollectedStatementStatistics, 0, len(a.stmts)*len(e.sqlStats.apps))
297+
ret = make([]roachpb.CollectedStatementStatistics, 0, len(a.stmts)*len(s.apps))
292298
}
293299
hashedAppName := HashForReporting(salt, appName)
294300
a.Lock()
@@ -315,7 +321,6 @@ func (e *Executor) GetScrubbedStmtStats() []roachpb.CollectedStatementStatistics
315321
}
316322
a.Unlock()
317323
}
318-
e.sqlStats.Unlock()
319324
return ret
320325
}
321326

0 commit comments

Comments
 (0)