diff --git a/connection_pool.go b/connection_pool.go index c41645ca..5e82305a 100644 --- a/connection_pool.go +++ b/connection_pool.go @@ -27,7 +27,7 @@ type ConnectionPool struct { hostIndex int log Logger rwLock sync.RWMutex - cleanerChan chan struct{} //notify when pool is close + cleanerChan chan struct{} // notify when pool is close closed bool sslConfig *tls.Config } @@ -111,15 +111,14 @@ func (pool *ConnectionPool) GetSession(username, password string) (*Session, err resp, err := conn.authenticate(username, password) if err != nil { // if authentication failed, put connection back - pool.rwLock.Lock() - defer pool.rwLock.Unlock() - removeFromList(&pool.activeConnectionQueue, conn) - pool.idleConnectionQueue.PushBack(conn) + pool.pushBack(conn) return nil, err } // Check auth response if resp.GetErrorCode() != nebula.ErrorCode_SUCCEEDED { + // if authentication responded with unsuccessful code, put connection back + pool.pushBack(conn) return nil, fmt.Errorf("failed to authenticate, error code: %d, error msg: %s", resp.GetErrorCode(), resp.GetErrorMsg()) } @@ -147,7 +146,7 @@ func (pool *ConnectionPool) getIdleConn() (*connection, error) { if pool.idleConnectionQueue.Len() > 0 { var newConn *connection = nil var newEle *list.Element = nil - var tmpNextEle *list.Element = nil + var tmpNextEle *list.Element for ele := pool.idleConnectionQueue.Front(); ele != nil; ele = tmpNextEle { // Check if connection is valid if res := ele.Value.(*connection).ping(); res { @@ -177,19 +176,26 @@ func (pool *ConnectionPool) getIdleConn() (*connection, error) { // Release connection to pool func (pool *ConnectionPool) release(conn *connection) { - pool.releaseAndBack(conn, true) + pool.deactivate(conn, true, true) +} + +func (pool *ConnectionPool) pushBack(conn *connection) { + pool.deactivate(conn, true, false) } -func (pool *ConnectionPool) releaseAndBack(conn *connection, pushBack bool) { +// Deactivate connection. Add to idleConnectionQueue if pushBack is true. +// Release connection if release is true. +func (pool *ConnectionPool) deactivate(conn *connection, pushBack, release bool) { pool.rwLock.Lock() defer pool.rwLock.Unlock() // Remove connection from active queue and add into idle queue removeFromList(&pool.activeConnectionQueue, conn) - if !pushBack { - return + if release { + conn.release() + } + if pushBack { + pool.idleConnectionQueue.PushBack(conn) } - conn.release() - pool.idleConnectionQueue.PushBack(conn) } // Ping checks availability of host @@ -202,7 +208,7 @@ func (pool *ConnectionPool) Close() { pool.rwLock.Lock() defer pool.rwLock.Unlock() - //TODO(Aiee) merge 2 lists and close all connections + // TODO(Aiee) merge 2 lists and close all connections idleLen := pool.idleConnectionQueue.Len() activeLen := pool.activeConnectionQueue.Len() @@ -324,10 +330,9 @@ func (pool *ConnectionPool) connectionCleaner() { } func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) { - if pool.conf.IdleTime > 0 { expiredSince := time.Now().Add(-pool.conf.IdleTime) - var newEle *list.Element = nil + var newEle *list.Element maxCleanSize := pool.idleConnectionQueue.Len() + pool.activeConnectionQueue.Len() - pool.conf.MinConnPoolSize @@ -353,9 +358,11 @@ func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) { // checkAddresses checks addresses availability // It opens a temporary connection to each address and closes it immediately. // If no error is returned, the addresses are available. -func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config, - useHTTP2 bool, httpHeader http.Header, handshakeKey string) error { - var timeout = 3 * time.Second +func checkAddresses( + confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config, + useHTTP2 bool, httpHeader http.Header, handshakeKey string, +) error { + timeout := 3 * time.Second if confTimeout != 0 && confTimeout < timeout { timeout = confTimeout } @@ -367,8 +374,10 @@ func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfi return nil } -func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config, - useHTTP2 bool, httpHeader http.Header, handshakeKey string) error { +func pingAddress( + address HostAddress, timeout time.Duration, sslConfig *tls.Config, + useHTTP2 bool, httpHeader http.Header, handshakeKey string, +) error { newConn := newConnection(address) // Open connection to host if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader, handshakeKey); err != nil { diff --git a/session.go b/session.go index 408cd6a8..3ceccd61 100644 --- a/session.go +++ b/session.go @@ -49,7 +49,6 @@ func (session *Session) executeWithReconnect(f func() (interface{}, error)) (int } // Execute with the new connection return f() - } // ExecuteWithParameter returns the result of the given query as a ResultSet @@ -81,7 +80,6 @@ func (session *Session) ExecuteWithParameter(stmt string, params map[string]inte return nil, err } return resp.(*ResultSet), err - } // Execute returns the result of the given query as a ResultSet @@ -219,7 +217,7 @@ func (session *Session) CreateSpace(conf SpaceConf) (*ResultSet, error) { conf.VidType = "FIXED_STRING(8)" } - q := "" + var q string if conf.IgnoreIfExists { q = fmt.Sprintf( "CREATE SPACE IF NOT EXISTS %s (partition_num = %d, replica_factor = %d, vid_type = %s)", @@ -264,7 +262,7 @@ func (session *Session) reConnect() error { return err } - session.connPool.releaseAndBack(session.connection, false) + session.connPool.deactivate(session.connection, false, false) session.connection = newConnection return nil }