Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: reduce contention on SessionRegistry mutex #95553

Merged
merged 1 commit into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ go_library(
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracing/tracingservicepb",
"//pkg/util/tracing/tracingui",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cenkalti_backoff//:backoff",
"@com_github_cockroachdb_apd_v3//:apd",
Expand Down
39 changes: 25 additions & 14 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
Expand Down Expand Up @@ -212,30 +213,27 @@ func (b *baseStatusServer) getLocalSessions(
showAll := reqUsername.Undefined()
showInternal := SQLStatsShowInternal.Get(&b.st.SV) || req.IncludeInternal

// In order to avoid duplicate sessions showing up as both open and closed,
// we lock the session registry to prevent any changes to it while we
// serialize the sessions from the session registry and the closed session
// cache.
b.sessionRegistry.Lock()
sessions := b.sessionRegistry.SerializeAllLocked()

sessions := b.sessionRegistry.SerializeAll()
var closedSessions []serverpb.Session
var closedSessionIDs map[uint128.Uint128]struct{}
if !req.ExcludeClosedSessions {
closedSessions = b.closedSessionCache.GetSerializedSessions()
closedSessionIDs = make(map[uint128.Uint128]struct{}, len(closedSessions))
for _, closedSession := range closedSessions {
closedSessionIDs[uint128.FromBytes(closedSession.ID)] = struct{}{}
}
}
b.sessionRegistry.Unlock()

userSessions := make([]serverpb.Session, 0)
sessions = append(sessions, closedSessions...)

reqUserNameNormalized := reqUsername.Normalized()
for _, session := range sessions {

userSessions := make([]serverpb.Session, 0, len(sessions)+len(closedSessions))
addUserSession := func(session serverpb.Session) {
// We filter based on the session name instead of the executor type because we
// may want to surface certain internal sessions, such as those executed by
// the SQL over HTTP api, as non-internal.
if (reqUserNameNormalized != session.Username && !showAll) ||
(!showInternal && isInternalAppName(session.ApplicationName)) {
continue
return
}

if !isAdmin && hasViewActivityRedacted && (reqUserNameNormalized != session.Username) {
Expand All @@ -247,9 +245,22 @@ func (b *baseStatusServer) getLocalSessions(
}
session.LastActiveQuery = session.LastActiveQueryNoConstants
}

userSessions = append(userSessions, session)
}
for _, session := range sessions {
// The same session can appear as both open and closed because reading the
// open and closed sessions is not synchronized. Prefer the closed session
// over the open one if the same session appears as both because it was
// closed in between reading the open sessions and reading the closed ones.
_, ok := closedSessionIDs[uint128.FromBytes(session.ID)]
if ok {
continue
}
addUserSession(session)
}
for _, session := range closedSessions {
addUserSession(session)
}

sort.Slice(userSessions, func(i, j int) bool {
return userSessions[i].Start.Before(userSessions[j].Start)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func TestQueryProgress(t *testing.T) {
// stalled ch as expected.
defer func() {
select {
case <-stalled: //stalled was closed as expected.
case <-stalled: // stalled was closed as expected.
default:
panic("expected stalled to have been closed during execution")
}
Expand Down
112 changes: 60 additions & 52 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ type ExecutorConfig struct {

// RootMemoryMonitor is the root memory monitor of the entire server. Do not
// use this for normal purposes. It is to be used to establish any new
// root-level memory accounts that are not related to a user sessions.
// root-level memory accounts that are not related to a user session.
RootMemoryMonitor *mon.BytesMonitor

// CompactEngineSpanFunc is used to inform a storage engine of the need to
Expand Down Expand Up @@ -2062,36 +2062,64 @@ type SessionArgs struct {
// SessionRegistry stores a set of all sessions on this node.
// Use register() and deregister() to modify this registry.
type SessionRegistry struct {
syncutil.Mutex
sessions map[clusterunique.ID]registrySession
sessionsByCancelKey map[pgwirecancel.BackendKeyData]registrySession
mu struct {
syncutil.RWMutex
sessionsByID map[clusterunique.ID]registrySession
sessionsByCancelKey map[pgwirecancel.BackendKeyData]registrySession
}
}

// NewSessionRegistry creates a new SessionRegistry with an empty set
// of sessions.
func NewSessionRegistry() *SessionRegistry {
return &SessionRegistry{
sessions: make(map[clusterunique.ID]registrySession),
sessionsByCancelKey: make(map[pgwirecancel.BackendKeyData]registrySession),
r := SessionRegistry{}
r.mu.sessionsByID = make(map[clusterunique.ID]registrySession)
r.mu.sessionsByCancelKey = make(map[pgwirecancel.BackendKeyData]registrySession)
return &r
}

func (r *SessionRegistry) getSessionByID(id clusterunique.ID) (registrySession, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
session, ok := r.mu.sessionsByID[id]
return session, ok
}

func (r *SessionRegistry) getSessionByCancelKey(
cancelKey pgwirecancel.BackendKeyData,
) (registrySession, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
session, ok := r.mu.sessionsByCancelKey[cancelKey]
return session, ok
}

func (r *SessionRegistry) getSessions() []registrySession {
r.mu.RLock()
defer r.mu.RUnlock()
sessions := make([]registrySession, 0, len(r.mu.sessionsByID))
for _, session := range r.mu.sessionsByID {
sessions = append(sessions, session)
}
return sessions
}

func (r *SessionRegistry) register(
id clusterunique.ID, queryCancelKey pgwirecancel.BackendKeyData, s registrySession,
) {
r.Lock()
defer r.Unlock()
r.sessions[id] = s
r.sessionsByCancelKey[queryCancelKey] = s
r.mu.Lock()
defer r.mu.Unlock()
r.mu.sessionsByID[id] = s
r.mu.sessionsByCancelKey[queryCancelKey] = s
}

func (r *SessionRegistry) deregister(
id clusterunique.ID, queryCancelKey pgwirecancel.BackendKeyData,
) {
r.Lock()
defer r.Unlock()
delete(r.sessions, id)
delete(r.sessionsByCancelKey, queryCancelKey)
r.mu.Lock()
defer r.mu.Unlock()
delete(r.mu.sessionsByID, id)
delete(r.mu.sessionsByCancelKey, queryCancelKey)
}

type registrySession interface {
Expand All @@ -2112,10 +2140,7 @@ func (r *SessionRegistry) CancelQuery(queryIDStr string) (bool, error) {
return false, errors.Wrapf(err, "query ID %s malformed", queryID)
}

r.Lock()
defer r.Unlock()

for _, session := range r.sessions {
for _, session := range r.getSessions() {
if session.cancelQuery(queryID) {
return true, nil
}
Expand All @@ -2129,15 +2154,11 @@ func (r *SessionRegistry) CancelQuery(queryIDStr string) (bool, error) {
func (r *SessionRegistry) CancelQueryByKey(
queryCancelKey pgwirecancel.BackendKeyData,
) (canceled bool, err error) {
r.Lock()
defer r.Unlock()
if session, ok := r.sessionsByCancelKey[queryCancelKey]; ok {
if session.cancelCurrentQueries() {
return true, nil
}
return false, nil
session, ok := r.getSessionByCancelKey(queryCancelKey)
if !ok {
return false, fmt.Errorf("session for cancel key %d not found", queryCancelKey)
}
return false, fmt.Errorf("session for cancel key %d not found", queryCancelKey)
return session.cancelCurrentQueries(), nil
}

// CancelSession looks up the specified session in the session registry and
Expand All @@ -2150,37 +2171,24 @@ func (r *SessionRegistry) CancelSession(
}
sessionID := clusterunique.IDFromBytes(sessionIDBytes)

r.Lock()
defer r.Unlock()

for id, session := range r.sessions {
if id == sessionID {
session.cancelSession()
return &serverpb.CancelSessionResponse{Canceled: true}, nil
}
session, ok := r.getSessionByID(sessionID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now uses the map to look up the session instead of scanning.

if !ok {
return &serverpb.CancelSessionResponse{
Error: fmt.Sprintf("session ID %s not found", sessionID),
}, nil
}

return &serverpb.CancelSessionResponse{
Error: fmt.Sprintf("session ID %s not found", sessionID),
}, nil
session.cancelSession()
return &serverpb.CancelSessionResponse{Canceled: true}, nil
}

// SerializeAll returns a slice of all sessions in the registry, converted to serverpb.Sessions.
// SerializeAll returns a slice of all sessions in the registry converted to
// serverpb.Sessions.
func (r *SessionRegistry) SerializeAll() []serverpb.Session {
r.Lock()
defer r.Unlock()

return r.SerializeAllLocked()
}

// SerializeAllLocked is like SerializeAll but assumes SessionRegistry's mutex is locked.
func (r *SessionRegistry) SerializeAllLocked() []serverpb.Session {
response := make([]serverpb.Session, 0, len(r.sessions))

for _, s := range r.sessions {
sessions := r.getSessions()
response := make([]serverpb.Session, 0, len(sessions))
for _, s := range sessions {
response = append(response, s.serialize())
}

return response
}

Expand Down