From 31f800f6a2721bbb95305c4a2f49bc3ef77c2d14 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 5 Mar 2025 12:20:42 -0800 Subject: [PATCH] server/context.go: Add a method on SessionManager which allows an integrator to wait for all client connections to drain. --- server/context.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/context.go b/server/context.go index 9cd4a48db5..aa4d187d8f 100644 --- a/server/context.go +++ b/server/context.go @@ -49,6 +49,10 @@ type SessionManager struct { connections map[uint32]*mysql.Conn lastPid uint64 ctxFactory sql.ContextFactory + // Implements WaitForClosedConnections(), which is only used + // at server shutdown to allow the integrator to ensure that + // no connections are being handled by handlers. + wg sync.WaitGroup } // NewSessionManager creates a SessionManager with the given SessionBuilder. @@ -82,6 +86,13 @@ func (s *SessionManager) nextPid() uint64 { return s.lastPid } +// Block the calling thread until all known connections are closed. It +// is an error to call this concurrently while the server might still +// be accepting new connections. +func (s *SessionManager) WaitForClosedConnections() { + s.wg.Wait() +} + // AddConn adds a connection to be tracked by the SessionManager. Should be called as // soon as possible after the server has accepted the connection. Results in // the connection being tracked by ProcessList and being available through @@ -93,6 +104,7 @@ func (s *SessionManager) AddConn(conn *mysql.Conn) { defer s.mu.Unlock() s.connections[conn.ConnectionID] = conn s.processlist.AddConnection(conn.ConnectionID, conn.RemoteAddr().String()) + s.wg.Add(1) } // NewSession creates a Session for the given connection and saves it to the session pool. @@ -270,6 +282,7 @@ func (s *SessionManager) KillConnection(connID uint32) error { func (s *SessionManager) RemoveConn(conn *mysql.Conn) { s.mu.Lock() defer s.mu.Unlock() + s.wg.Done() if cur, ok := s.sessions[conn.ConnectionID]; ok { sql.SessionEnd(cur) }