Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: introduce connExecutor, the query execution orchestrator #22277

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,8 +957,8 @@ func (g *Gossip) GetSystemConfig() (config.SystemConfig, bool) {
}

// RegisterSystemConfigChannel registers a channel to signify updates for the
// system config. It is notified after registration, and whenever a new
// system config is successfully unmarshaled.
// system config. It is notified after registration (if a system config is
// already set), and whenever a new system config is successfully unmarshaled.
func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
g.systemConfigMu.Lock()
defer g.systemConfigMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *adminServer) NewContextAndSessionForRPC(
) (context.Context, *sql.Session) {
ctx = s.server.AnnotateCtx(ctx)
session := sql.NewSession(
ctx, args, s.server.sqlExecutor, nil /* remote */, s.memMetrics, nil /* conn */)
ctx, args, s.server.sqlExecutor, s.memMetrics, nil /* conn */)
session.StartMonitor(&s.memMonitor, mon.BoundAccount{})
return ctx, session
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestAdminAPIDatabases(t *testing.T) {
const testdb = "test"
session := sql.NewSession(
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
&sql.MemoryMetrics{}, nil /* conn */)
session.StartUnlimitedMonitor()
defer session.Finish(ts.sqlExecutor)
query := "CREATE DATABASE " + testdb
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestAdminAPITableDetails(t *testing.T) {

session := sql.NewSession(
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
&sql.MemoryMetrics{}, nil /* conn */)
session.StartUnlimitedMonitor()
defer session.Finish(ts.sqlExecutor)
setupQueries := []string{
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestAdminAPIZoneDetails(t *testing.T) {
defer span.Finish()
session := sql.NewSession(
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
&sql.MemoryMetrics{}, nil /* conn */)
session.StartUnlimitedMonitor()
setupQueries := []string{
"CREATE DATABASE test",
Expand Down Expand Up @@ -668,7 +668,7 @@ func TestAdminAPIUsers(t *testing.T) {
defer span.Finish()
session := sql.NewSession(
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
&sql.MemoryMetrics{}, nil /* conn */)
session.StartUnlimitedMonitor()
defer session.Finish(ts.sqlExecutor)
query := `
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestAdminAPIEvents(t *testing.T) {
defer span.Finish()
session := sql.NewSession(
ctx, sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor,
nil /* remote */, &sql.MemoryMetrics{}, nil /* conn */)
&sql.MemoryMetrics{}, nil /* conn */)
session.StartUnlimitedMonitor()
defer session.Finish(ts.sqlExecutor)
setupQueries := []string{
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (s *authenticationServer) verifyPassword(
ctx context.Context, username string, password string,
) (bool, error) {
exists, hashedPassword, err := sql.GetUserHashedPassword(
ctx, s.server.sqlExecutor, s.memMetrics, username,
ctx, s.server.execCfg, s.memMetrics, username,
)
if err != nil {
return false, err
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ type Config struct {
// the Admin API's HTTP endpoints.
EnableWebSessionAuthentication bool

// UseLegacyConnHandling, if set, makes the Server use the old code for
// handling pgwire connections.
//
// TODO(andrei): remove this once the code for the old v3Conn and Executor is
// deleted.
UseLegacyConnHandling bool

enginesCreated bool
}

Expand Down Expand Up @@ -377,6 +384,7 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config {
cfg.Config.InitDefaults()
cfg.RaftConfig.SetDefaults()
cfg.LeaseManagerConfig = base.NewLeaseManagerConfig()

return cfg
}

Expand Down Expand Up @@ -589,6 +597,8 @@ func (cfg *Config) readEnvironmentVariables() {
cfg.Linearizable = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_LINEARIZABLE", cfg.Linearizable)
cfg.ScanInterval = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_INTERVAL", cfg.ScanInterval)
cfg.ScanMaxIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MAX_IDLE_TIME", cfg.ScanMaxIdleTime)
cfg.UseLegacyConnHandling = envutil.EnvOrDefaultBool(
"COCKROACH_USE_LEGACY_CONN_HANDLING", false)
}

// parseGossipBootstrapResolvers parses list of gossip bootstrap resolvers.
Expand Down
18 changes: 16 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
execCfg.EvalContextTestingKnobs = *sqlEvalContext.(*tree.EvalContextTestingKnobs)
}
s.sqlExecutor = sql.NewExecutor(execCfg, s.stopper)
s.registry.AddMetricStruct(s.sqlExecutor)
if s.cfg.UseLegacyConnHandling {
s.registry.AddMetricStruct(s.sqlExecutor)
}

s.pgServer = pgwire.MakeServer(
s.cfg.AmbientCtx,
Expand All @@ -531,8 +533,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
&s.internalMemMetrics,
&rootSQLMemoryMonitor,
s.cfg.HistogramWindowInterval(),
&execCfg,
)
s.registry.AddMetricStruct(s.pgServer.Metrics())
if !s.cfg.UseLegacyConnHandling {
s.registry.AddMetricStruct(s.pgServer.StatementCounters())
s.registry.AddMetricStruct(s.pgServer.EngineMetrics())
}

sqlExecutor.ExecCfg = &execCfg
s.execCfg = &execCfg
Expand Down Expand Up @@ -1166,6 +1173,7 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.

s.sqlExecutor.Start(ctx, s.execCfg.DistSQLPlanner)
s.distSQLServer.Start()
s.pgServer.Start(ctx, s.stopper)

s.serveMode.set(modeOperational)

Expand Down Expand Up @@ -1268,7 +1276,13 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
connCtx := log.WithLogTagStr(pgCtx, "client", conn.RemoteAddr().String())
setTCPKeepAlive(connCtx, conn)

if err := s.pgServer.ServeConn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
var serveFn func(ctx context.Context, conn net.Conn) error
if !s.cfg.UseLegacyConnHandling {
serveFn = s.pgServer.ServeConn2
} else {
serveFn = s.pgServer.ServeConn
}
if err := serveFn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
// Report the error on this connection's context, so that we
// know which remote client caused the error when looking at
// the logs.
Expand Down
18 changes: 14 additions & 4 deletions pkg/server/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,13 @@ func (s *Server) maybeReportDiagnostics(
if log.DiagnosticsReportingEnabled.Get(&s.st.SV) && diagnosticsMetricsEnabled.Get(&s.st.SV) {
s.reportDiagnostics(running)
}
s.sqlExecutor.ResetStatementStats(ctx)
s.sqlExecutor.ResetUnimplementedCounts()
if !s.cfg.UseLegacyConnHandling {
s.pgServer.SQLServer.ResetStatementStats(ctx)
s.pgServer.SQLServer.ResetUnimplementedCounts()
} else {
s.sqlExecutor.ResetStatementStats(ctx)
s.sqlExecutor.ResetUnimplementedCounts()
}

return scheduled.Add(diagnosticReportFrequency.Get(&s.st.SV))
}
Expand Down Expand Up @@ -280,7 +285,6 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
schema = nil
}
info.Schema = schema
info.SqlStats = s.sqlExecutor.GetScrubbedStmtStats()
info.UnimplementedErrors = make(map[string]int64)

// Read the system.settings table to determine the settings for which we have
Expand Down Expand Up @@ -318,7 +322,13 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
}
}

s.sqlExecutor.FillUnimplementedErrorCounts(info.UnimplementedErrors)
if !s.cfg.UseLegacyConnHandling {
info.SqlStats = s.pgServer.SQLServer.GetScrubbedStmtStats()
s.pgServer.SQLServer.FillUnimplementedErrorCounts(info.UnimplementedErrors)
} else {
info.SqlStats = s.sqlExecutor.GetScrubbedStmtStats()
s.sqlExecutor.FillUnimplementedErrorCounts(info.UnimplementedErrors)
}
return &info
}

Expand Down
19 changes: 12 additions & 7 deletions pkg/sql/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func anonymizeStmt(stmt Statement) string {
}

// sqlStats carries per-application statistics for all applications on
// each node. It hangs off Executor.
// each node.
type sqlStats struct {
st *cluster.Settings
syncutil.Mutex
Expand Down Expand Up @@ -281,14 +281,20 @@ func scrubStmtStatKey(vt VirtualTabler, key string) (string, bool) {
// queries scrubbed of their identifiers. Any statements which cannot be
// scrubbed will be omitted from the returned map.
func (e *Executor) GetScrubbedStmtStats() []roachpb.CollectedStatementStatistics {
return e.sqlStats.getScrubbedStmtStats(e.cfg.VirtualSchemas)
}

func (s *sqlStats) getScrubbedStmtStats(
vt *VirtualSchemaHolder,
) []roachpb.CollectedStatementStatistics {
s.Lock()
defer s.Unlock()
var ret []roachpb.CollectedStatementStatistics
vt := e.cfg.VirtualSchemas
e.sqlStats.Lock()
salt := ClusterSecret.Get(&e.cfg.Settings.SV)
for appName, a := range e.sqlStats.apps {
salt := ClusterSecret.Get(&s.st.SV)
for appName, a := range s.apps {
if cap(ret) == 0 {
// guesstimate that we'll need apps*(queries-per-app).
ret = make([]roachpb.CollectedStatementStatistics, 0, len(a.stmts)*len(e.sqlStats.apps))
ret = make([]roachpb.CollectedStatementStatistics, 0, len(a.stmts)*len(s.apps))
}
hashedAppName := HashForReporting(salt, appName)
a.Lock()
Expand All @@ -315,7 +321,6 @@ func (e *Executor) GetScrubbedStmtStats() []roachpb.CollectedStatementStatistics
}
a.Unlock()
}
e.sqlStats.Unlock()
return ret
}

Expand Down
Loading