From 22b5a58fa472cbcb45ad3af529114341f6cd9a04 Mon Sep 17 00:00:00 2001 From: Simon Richardson Date: Wed, 15 Jan 2025 16:58:26 +0000 Subject: [PATCH] feat: introduce pool unused limit When acquiring a socket from the pool, it will also attempt to check if there are any previously used sockets (unused sockets) and reuse them for the connection. If there is a burst of socket acquisitions, then the sockets will be placed back in the unused pool. This creates a high watermark that never goes back down, at least not without restarting the application. The existing pool limit could be used to prevent the excess sockets, though it requires the application to handle load shedding via the --- cluster.go | 6 ++--- server.go | 24 ++++++++++++++----- session.go | 68 +++++++++++++++++++++++++++++++++++++++--------------- socket.go | 4 ++-- 4 files changed, 72 insertions(+), 30 deletions(-) diff --git a/cluster.go b/cluster.go index 9287c68f..43694e71 100644 --- a/cluster.go +++ b/cluster.go @@ -194,7 +194,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI continue } err = cluster.isMaster(socket, &result) - socket.Release() + socket.Release(0) if err != nil { tryerr = err logf("SYNC Command 'ismaster' to %s failed: %v", addr, err) @@ -583,7 +583,7 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) { // AcquireSocket returns a socket to a server in the cluster. If slaveOk is // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. -func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) { +func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit, poolUnusedLimit int) (s *mongoSocket, err error) { var started time.Time var syncCount uint warnedLimit := false @@ -647,7 +647,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout err := cluster.isMaster(s, &result) if err != nil || !result.IsMaster { logf("Cannot confirm server %s as master (%v)", server.Addr, err) - s.Release() + s.Release(poolUnusedLimit) cluster.syncServers() time.Sleep(100 * time.Millisecond) continue diff --git a/server.go b/server.go index 71919ce9..b38d6b37 100644 --- a/server.go +++ b/server.go @@ -133,7 +133,7 @@ func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) ( // closed in the meantime if server.closed { server.Unlock() - socket.Release() + socket.Release(0) socket.Close() return nil, abended, errServerClosed } @@ -205,12 +205,24 @@ func (server *mongoServer) Close() { } // RecycleSocket puts socket back into the unused cache. -func (server *mongoServer) RecycleSocket(socket *mongoSocket) { +func (server *mongoServer) RecycleSocket(socket *mongoSocket, poolUnusedLimit int) { server.Lock() - if !server.closed { - server.unusedSockets = append(server.unusedSockets, socket) + defer server.Unlock() + + if server.closed { + return } - server.Unlock() + + // If the number of unused sockets is too high, we just close this one. + // This won't close existing connections, and it's possible that we could + // still end up with a high watermark of unused sockets, but it should lead + // to a reduction if there is a sustained period of low usage. + if poolUnusedLimit > 0 && len(server.unusedSockets) > poolUnusedLimit { + socket.Close() + return + } + + server.unusedSockets = append(server.unusedSockets, socket) } func removeSocket(sockets []*mongoSocket, socket *mongoSocket) []*mongoSocket { @@ -319,7 +331,7 @@ func (server *mongoServer) pinger(loop bool) { max = server.pingWindow[i] } } - socket.Release() + socket.Release(0) server.Lock() if server.closed { loop = false diff --git a/session.go b/session.go index 9f326d93..e7ca1be7 100644 --- a/session.go +++ b/session.go @@ -90,6 +90,7 @@ type Session struct { dialCred *Credential creds []Credential poolLimit int + poolUnusedLimit int bypassValidation bool sessionId bson.Binary nextTransactionNumber int64 @@ -415,6 +416,10 @@ type DialInfo struct { // See Session.SetPoolLimit for details. PoolLimit int + // PoolUnusedLimit defines the limit for the number of unused sockets + // in the pool. Defaults to 0, meaning no limit. + PoolUnusedLimit int + // DialServer optionally specifies the dial function for establishing // connections with the MongoDB servers. DialServer func(addr *ServerAddr) (net.Conn, error) @@ -491,6 +496,9 @@ func DialWithInfo(info *DialInfo) (*Session, error) { if info.PoolLimit > 0 { session.poolLimit = info.PoolLimit } + if info.PoolUnusedLimit > 0 { + session.poolUnusedLimit = info.PoolUnusedLimit + } cluster.Release() // People get confused when we return a session that is not actually @@ -674,22 +682,21 @@ func (db *Database) GridFS(prefix string) *GridFS { // use an ordering-preserving document, such as a struct value or an // instance of bson.D. For instance: // -// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}}) +// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}}) // // For privilleged commands typically run on the "admin" database, see // the Run method in the Session type. // // Relevant documentation: // -// http://www.mongodb.org/display/DOCS/Commands -// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips -// +// http://www.mongodb.org/display/DOCS/Commands +// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips func (db *Database) Run(cmd interface{}, result interface{}) error { socket, err := db.Session.acquireSocket(true) if err != nil { return err } - defer socket.Release() + defer socket.Release(0) // This is an optimized form of db.C("$cmd").Find(cmd).One(result). return db.run(socket, cmd, result) @@ -738,7 +745,7 @@ func (s *Session) Login(cred *Credential) error { if err != nil { return err } - defer socket.Release() + defer socket.Release(0) credCopy := *cred if cred.Source == "" { @@ -1804,6 +1811,16 @@ func (s *Session) SetPoolLimit(limit int) { s.m.Unlock() } +// SetPoolUnusedLimit sets the maximum number of unused sockets in the pool +// before the pool starts closing the least recently used sockets. +// The default limit is 0, which means that sockets are never closed until +// the pool limit is reached. +func (s *Session) SetPoolUnusedLimit(limit int) { + s.m.Lock() + s.poolUnusedLimit = limit + s.m.Unlock() +} + // SetBypassValidation sets whether the server should bypass the registered // validation expressions executed when documents are inserted or modified, // in the interest of preserving invariants in the collection being modified. @@ -3219,13 +3236,14 @@ func (q *Query) One(result interface{}) (err error) { session.m.RLock() txn := session.transaction startTxn := txn != nil && !txn.started + poolUnusedLimit := session.poolUnusedLimit session.m.RUnlock() socket, err := session.acquireSocket(true) if err != nil { return err } - defer socket.Release() + defer socket.Release(poolUnusedLimit) op.limit = -1 @@ -3593,6 +3611,7 @@ func (q *Query) Iter() *Iter { session.m.RLock() txn := session.transaction startTxn := txn != nil && !txn.started + poolUnusedLimit := session.poolUnusedLimit session.m.RUnlock() iter := &Iter{ @@ -3612,7 +3631,7 @@ func (q *Query) Iter() *Iter { iter.err = err return iter } - defer socket.Release() + defer socket.Release(poolUnusedLimit) session.prepareQuery(&op) op.replyFunc = iter.op.replyFunc @@ -3690,6 +3709,7 @@ func (q *Query) Tail(timeout time.Duration) *Iter { session := q.session op := q.op prefetch := q.prefetch + poolUnusedLimit := session.poolUnusedLimit q.m.Unlock() iter := &Iter{session: session, prefetch: prefetch} @@ -3715,7 +3735,7 @@ func (q *Query) Tail(timeout time.Duration) *Iter { iter.err = err iter.m.Unlock() } - socket.Release() + socket.Release(poolUnusedLimit) } return iter } @@ -3777,7 +3797,7 @@ func (iter *Iter) Close() error { if err == nil { // TODO Batch kills. err = socket.Query(&killCursorsOp{[]int64{cursorId}}) - socket.Release() + socket.Release(iter.session.poolUnusedLimit) } iter.m.Lock() @@ -4040,15 +4060,16 @@ func (iter *Iter) acquireSocket() (*mongoSocket, error) { // to primary. Our cursor is in a specific server, though. iter.session.m.Lock() sockTimeout := iter.session.sockTimeout + poolUnusedLimit := iter.session.poolUnusedLimit iter.session.m.Unlock() - socket.Release() + socket.Release(poolUnusedLimit) socket, _, err = iter.server.AcquireSocket(0, sockTimeout) if err != nil { return nil, err } err := iter.session.socketLogin(socket) if err != nil { - socket.Release() + socket.Release(poolUnusedLimit) return nil, err } } @@ -4066,7 +4087,10 @@ func (iter *Iter) getMore() { iter.err = err return } - defer socket.Release() + iter.session.m.RLock() + poolUnusedLimit := iter.session.poolUnusedLimit + iter.session.m.RUnlock() + defer socket.Release(poolUnusedLimit) debugf("Iter %p requesting more documents", iter) if iter.limit > 0 { @@ -4611,14 +4635,14 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { } // Still not good. We need a new socket. - sock, err := cluster.AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) + sock, err := cluster.AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolUnusedLimit) if err != nil { return nil, err } // Authenticate the new socket. if err = s.socketLogin(sock); err != nil { - sock.Release() + sock.Release(s.poolUnusedLimit) return nil, err } @@ -4657,7 +4681,7 @@ func (s *Session) handleNotPrimaryWrite(on *mongoSocket) { s.m.RUnlock() s.m.Lock() if s.masterSocket == on { - s.masterSocket.Release() + s.masterSocket.Release(s.poolUnusedLimit) s.masterSocket = nil } s.m.Unlock() @@ -4687,11 +4711,14 @@ func (s *Session) setSocket(socket *mongoSocket) { // unsetSocket releases any slave and/or master sockets reserved. func (s *Session) unsetSocket() { + s.m.RLock() + poolUnusedLimit := s.poolUnusedLimit + s.m.RUnlock() if s.masterSocket != nil { - s.masterSocket.Release() + s.masterSocket.Release(poolUnusedLimit) } if s.slaveSocket != nil { - s.slaveSocket.Release() + s.slaveSocket.Release(poolUnusedLimit) } s.masterSocket = nil s.slaveSocket = nil @@ -4808,7 +4835,10 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err if err != nil { return nil, err } - defer socket.Release() + s.m.RLock() + poolUnusedLimit := s.poolUnusedLimit + s.m.RUnlock() + defer socket.Release(poolUnusedLimit) defer func() { if IsNotPrimaryError(err) { s.handleNotPrimaryWrite(socket) diff --git a/socket.go b/socket.go index 0df7939a..d792f618 100644 --- a/socket.go +++ b/socket.go @@ -255,7 +255,7 @@ func (socket *mongoSocket) Acquire() (info *mongoServerInfo) { // Release decrements a socket reference. The socket will be // recycled once its released as many times as it's been acquired. -func (socket *mongoSocket) Release() { +func (socket *mongoSocket) Release(poolUnusedLimit int) { socket.Lock() if socket.references == 0 { panic("socket.Release() with references == 0") @@ -269,7 +269,7 @@ func (socket *mongoSocket) Release() { socket.LogoutAll() // If the socket is dead server is nil. if server != nil { - server.RecycleSocket(socket) + server.RecycleSocket(socket, poolUnusedLimit) } } else { socket.Unlock()