Skip to content

Commit

Permalink
feat: put conn to idle if auth responded with unsuccessful code (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
TomCN0803 authored Apr 8, 2024
1 parent 9a0ce41 commit e7925a2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
49 changes: 29 additions & 20 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ConnectionPool struct {
hostIndex int
log Logger
rwLock sync.RWMutex
cleanerChan chan struct{} //notify when pool is close
cleanerChan chan struct{} // notify when pool is close
closed bool
sslConfig *tls.Config
}
Expand Down Expand Up @@ -111,15 +111,14 @@ func (pool *ConnectionPool) GetSession(username, password string) (*Session, err
resp, err := conn.authenticate(username, password)
if err != nil {
// if authentication failed, put connection back
pool.rwLock.Lock()
defer pool.rwLock.Unlock()
removeFromList(&pool.activeConnectionQueue, conn)
pool.idleConnectionQueue.PushBack(conn)
pool.pushBack(conn)
return nil, err
}

// Check auth response
if resp.GetErrorCode() != nebula.ErrorCode_SUCCEEDED {
// if authentication responded with unsuccessful code, put connection back
pool.pushBack(conn)
return nil, fmt.Errorf("failed to authenticate, error code: %d, error msg: %s",
resp.GetErrorCode(), resp.GetErrorMsg())
}
Expand Down Expand Up @@ -147,7 +146,7 @@ func (pool *ConnectionPool) getIdleConn() (*connection, error) {
if pool.idleConnectionQueue.Len() > 0 {
var newConn *connection = nil
var newEle *list.Element = nil
var tmpNextEle *list.Element = nil
var tmpNextEle *list.Element
for ele := pool.idleConnectionQueue.Front(); ele != nil; ele = tmpNextEle {
// Check if connection is valid
if res := ele.Value.(*connection).ping(); res {
Expand Down Expand Up @@ -177,19 +176,26 @@ func (pool *ConnectionPool) getIdleConn() (*connection, error) {

// Release connection to pool
func (pool *ConnectionPool) release(conn *connection) {
pool.releaseAndBack(conn, true)
pool.deactivate(conn, true, true)
}

func (pool *ConnectionPool) pushBack(conn *connection) {
pool.deactivate(conn, true, false)
}

func (pool *ConnectionPool) releaseAndBack(conn *connection, pushBack bool) {
// Deactivate connection. Add to idleConnectionQueue if pushBack is true.
// Release connection if release is true.
func (pool *ConnectionPool) deactivate(conn *connection, pushBack, release bool) {
pool.rwLock.Lock()
defer pool.rwLock.Unlock()
// Remove connection from active queue and add into idle queue
removeFromList(&pool.activeConnectionQueue, conn)
if !pushBack {
return
if release {
conn.release()
}
if pushBack {
pool.idleConnectionQueue.PushBack(conn)
}
conn.release()
pool.idleConnectionQueue.PushBack(conn)
}

// Ping checks availability of host
Expand All @@ -202,7 +208,7 @@ func (pool *ConnectionPool) Close() {
pool.rwLock.Lock()
defer pool.rwLock.Unlock()

//TODO(Aiee) merge 2 lists and close all connections
// TODO(Aiee) merge 2 lists and close all connections
idleLen := pool.idleConnectionQueue.Len()
activeLen := pool.activeConnectionQueue.Len()

Expand Down Expand Up @@ -324,10 +330,9 @@ func (pool *ConnectionPool) connectionCleaner() {
}

func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) {

if pool.conf.IdleTime > 0 {
expiredSince := time.Now().Add(-pool.conf.IdleTime)
var newEle *list.Element = nil
var newEle *list.Element

maxCleanSize := pool.idleConnectionQueue.Len() + pool.activeConnectionQueue.Len() - pool.conf.MinConnPoolSize

Expand All @@ -353,9 +358,11 @@ func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) {
// checkAddresses checks addresses availability
// It opens a temporary connection to each address and closes it immediately.
// If no error is returned, the addresses are available.
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header, handshakeKey string) error {
var timeout = 3 * time.Second
func checkAddresses(
confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header, handshakeKey string,
) error {
timeout := 3 * time.Second
if confTimeout != 0 && confTimeout < timeout {
timeout = confTimeout
}
Expand All @@ -367,8 +374,10 @@ func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfi
return nil
}

func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header, handshakeKey string) error {
func pingAddress(
address HostAddress, timeout time.Duration, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header, handshakeKey string,
) error {
newConn := newConnection(address)
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader, handshakeKey); err != nil {
Expand Down
6 changes: 2 additions & 4 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (session *Session) executeWithReconnect(f func() (interface{}, error)) (int
}
// Execute with the new connection
return f()

}

// ExecuteWithParameter returns the result of the given query as a ResultSet
Expand Down Expand Up @@ -81,7 +80,6 @@ func (session *Session) ExecuteWithParameter(stmt string, params map[string]inte
return nil, err
}
return resp.(*ResultSet), err

}

// Execute returns the result of the given query as a ResultSet
Expand Down Expand Up @@ -219,7 +217,7 @@ func (session *Session) CreateSpace(conf SpaceConf) (*ResultSet, error) {
conf.VidType = "FIXED_STRING(8)"
}

q := ""
var q string
if conf.IgnoreIfExists {
q = fmt.Sprintf(
"CREATE SPACE IF NOT EXISTS %s (partition_num = %d, replica_factor = %d, vid_type = %s)",
Expand Down Expand Up @@ -264,7 +262,7 @@ func (session *Session) reConnect() error {
return err
}

session.connPool.releaseAndBack(session.connection, false)
session.connPool.deactivate(session.connection, false, false)
session.connection = newConnection
return nil
}
Expand Down

0 comments on commit e7925a2

Please sign in to comment.