Skip to content

Commit

Permalink
Auto-reconnect on broken pipe or non-leader query, auto-retry in case…
Browse files Browse the repository at this point in the history
… an complete, non-deserializable response
  • Loading branch information
radekg committed Jan 10, 2022
1 parent 76f9c19 commit c9e2b44
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 24 deletions.
172 changes: 155 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package client

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/hashicorp/go-hclog"
"github.com/radekg/yugabyte-db-go-client/configs"
clientErrors "github.com/radekg/yugabyte-db-go-client/errors"
"google.golang.org/protobuf/reflect/protoreflect"

ybApi "github.com/radekg/yugabyte-db-go-proto/v2/yb/api"
Expand All @@ -25,9 +28,11 @@ type YBClient interface {
}

var (
errAlreadyConnected = fmt.Errorf("client: already connected")
errConnecting = fmt.Errorf("client: connecting")
errNoClient = fmt.Errorf("client: no client")
errAlreadyConnected = fmt.Errorf("client: already connected")
errConnecting = fmt.Errorf("client: connecting")
errLeaderWaitTimeout = fmt.Errorf("client: leader wait timed out")
errNoClient = fmt.Errorf("client: no client")
errNotReconnected = fmt.Errorf("client: reconnect failed")
)

type defaultYBClient struct {
Expand All @@ -41,7 +46,7 @@ type defaultYBClient struct {
// NewYBClient constructs a new instance of the high-level YugabyteDB client.
func NewYBClient(config *configs.YBClientConfig, logger hclog.Logger) YBClient {
return &defaultYBClient{
config: config,
config: config.WithDefaults(),
lock: &sync.Mutex{},
logger: logger,
}
Expand All @@ -50,14 +55,10 @@ func NewYBClient(config *configs.YBClientConfig, logger hclog.Logger) YBClient {
func (c *defaultYBClient) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.connectedClient == nil {
return errNoClient
}

closeError := c.connectedClient.Close()
c.connectedClient = nil
return closeError
return c.closeUnsafe()
}

func (c *defaultYBClient) Connect() error {
Expand All @@ -72,6 +73,145 @@ func (c *defaultYBClient) Connect() error {
return errAlreadyConnected
}

return c.connectUnsafe()
}

func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) error {
c.lock.Lock()
defer c.lock.Unlock()

if c.connectedClient == nil {
return errNoClient
}

currentAttempt := int32(1)

for {

executeErr := c.connectedClient.Execute(payload, response)

// the response might have an error in it, check if this is a response returning ybApi.MasterErrorPB
if tResponse, ok := response.(clientErrors.AbstractMasterErrorResponse); ok {
// was there an error in that response?
if masterError := clientErrors.NewMasterError(tResponse.GetError()); masterError != nil {
// we know it was not nil because masterError was not nil
responseError := tResponse.GetError()
if responseError.Code != nil {
responseErrorCode := *responseError.Code
if int32(responseErrorCode.Number()) == int32(ybApi.MasterErrorPB_NOT_THE_LEADER.Number()) {
c.logger.Warn("execute: response with NOT_THE_LEADER master status code, reconnect", "reason", masterError)
executeErr = &clientErrors.RequiresReconnectError{
Cause: masterError,
}
}
}
}
}

if executeErr == nil {
return nil
}

if c.config.MaxExecuteRetries <= configs.NoExecuteRetry {
reportErr := executeErr
if tReconnectError, ok := executeErr.(*clientErrors.RequiresReconnectError); ok {
reportErr = tReconnectError.Cause
}
c.logger.Error("execute: retry disabled, not retrying", "reason", reportErr)
return reportErr
}

if currentAttempt > c.config.MaxExecuteRetries {
reportErr := executeErr
if tReconnectError, ok := executeErr.(*clientErrors.RequiresReconnectError); ok {
reportErr = tReconnectError.Cause
}
c.logger.Error("execute: failed for a maximum number of allowed attempts, giving up", "reason", reportErr)
return reportErr
}

// broken pipe qualifies for immediate retry:
if errors.Is(executeErr, syscall.EPIPE) {
executeErr = &clientErrors.RequiresReconnectError{
Cause: executeErr,
}
}

if _, ok := executeErr.(*clientErrors.UnprocessableResponseError); ok {
// complete payload has been read from the server
// but payload could not be deserialized as protobuf,
// this qualifies for immediate retry:
currentAttempt = currentAttempt + 1
<-time.After(c.config.RetryInterval)
continue
}

if tReconnectError, ok := executeErr.(*clientErrors.RequiresReconnectError); ok {

if c.config.MaxReconnectAttempts <= configs.NoReconnectAttempts {
c.logger.Error("execute: not reconnecting after error, max reconnect attempts not set",
"reason", tReconnectError.Cause)
return tReconnectError.Cause
}

c.logger.Debug("execute: attempting reconnect due to an error",
"reason", tReconnectError.Cause)

// reconnect:
currentReconnectAttempt := int32(1)
reconnected := false
for {

reconnectErr := c.reconnect()

if reconnectErr == nil {
reconnected = true
break
}

if currentReconnectAttempt == c.config.MaxReconnectAttempts {
break
}

c.logger.Error("execute: failed reconnect",
"attempt", currentReconnectAttempt,
"max-attempts", c.config.MaxReconnectAttempts,
"reason", reconnectErr)

currentReconnectAttempt = currentReconnectAttempt + 1
<-time.After(c.config.ReconnectRetryInterval)

}

if !reconnected {
c.logger.Error("execute: failed reconnect consecutive maximum reconnect attempts",
"max-attempts", c.config.MaxReconnectAttempts,
"reason", tReconnectError.Cause)
return errNotReconnected
}

// retry:
<-time.After(c.config.RetryInterval)
currentAttempt = currentAttempt + 1
continue

} // reconnect handling / end

// in case of any other error, no recovery:
return executeErr

}

}

func (c *defaultYBClient) closeUnsafe() error {
closeError := c.connectedClient.Close()
c.connectedClient = nil
return closeError
}

func (c *defaultYBClient) connectUnsafe() error {

tlsConfig, err := c.config.TLSConfig()
if err != nil {
return err
Expand Down Expand Up @@ -163,25 +303,23 @@ func (c *defaultYBClient) Connect() error {
atomic.AddUint64(&done, 1)
if atomic.LoadUint64(&done) == max {
c.isConnecting = false
return fmt.Errorf("no reachable leaders")
return &clientErrors.NoLeaderError{}
}
case connectedClient := <-chanConnectedClient:
c.connectedClient = connectedClient
c.isConnecting = false
return nil
case <-time.After(c.config.OpTimeout):
c.isConnecting = false
return fmt.Errorf("failed to connect to a leader master within timeout")
return errLeaderWaitTimeout
}
}

}

func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) error {
c.lock.Lock()
defer c.lock.Unlock()
if c.connectedClient == nil {
return errNoClient
func (c *defaultYBClient) reconnect() error {
if err := c.closeUnsafe(); err != nil {
return err
}
return c.connectedClient.Execute(payload, response)
return c.connectUnsafe()
}
6 changes: 4 additions & 2 deletions client/single_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,10 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto

protoErr2 := utils.DeserializeProto(responsePayloadBuf, m)
if protoErr2 != nil {
opLogger.Error("failed unmarshalling response payload", "reason", protoErr2, "consumed-data", string(responsePayloadBuf))
return err
return &errors.UnprocessableResponseError{
Cause: protoErr2,
ConsumedPayload: responsePayloadBuf,
}
}

return nil
Expand Down
52 changes: 47 additions & 5 deletions configs/ybclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,57 @@ type YBSingleNodeClientConfig struct {
OpTimeout uint32
}

const (
// DefaultMaxExecuteRetries is the default maximum number of retries for a failed execute.
DefaultMaxExecuteRetries int32 = 10
// DefaultMaxReconnectAttempts is the default max reconnect attempts value.
DefaultMaxReconnectAttempts int32 = 10
// DefaultOpTimeout is the default operation timeout value.
DefaultOpTimeout = time.Second * 60
// DefaultReconnectRetryInterval is the default reconnect retry interval value.
DefaultReconnectRetryInterval = time.Second
// DefaultRetryInterval is the default retry interval value.
DefaultRetryInterval = time.Second

// NoExecuteRetry is a magic value disabling retry of failed execute.
NoExecuteRetry int32 = -1
// NoReconnectAttempts is a magic value disabling reconnect attempts.
NoReconnectAttempts int32 = -1
)

// YBClientConfig represents the shared CLI config.
type YBClientConfig struct {
tlsConfig *tls.Config

MasterHostPort []string
OpTimeout time.Duration
TLSCaCertFilePath string
TLSCertFilePath string
TLSKeyFilePath string
MasterHostPort []string
OpTimeout time.Duration
MaxExecuteRetries int32
MaxReconnectAttempts int32
ReconnectRetryInterval time.Duration
RetryInterval time.Duration
TLSCaCertFilePath string
TLSCertFilePath string
TLSKeyFilePath string
}

// WithDefaults applies defaults to unset values.
func (c *YBClientConfig) WithDefaults() *YBClientConfig {
if c.MaxExecuteRetries == 0 {
c.MaxExecuteRetries = DefaultMaxExecuteRetries
}
if c.MaxReconnectAttempts == 0 {
c.MaxReconnectAttempts = DefaultMaxReconnectAttempts
}
if c.OpTimeout == 0 {
c.OpTimeout = DefaultOpTimeout
}
if c.ReconnectRetryInterval == 0 {
c.ReconnectRetryInterval = DefaultReconnectRetryInterval
}
if c.RetryInterval == 0 {
c.RetryInterval = DefaultRetryInterval
}
return c
}

// TLSConfig returns TLS config if TLS is configured.
Expand Down
49 changes: 49 additions & 0 deletions errors/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package errors

import (
"fmt"

"github.com/hashicorp/go-multierror"

ybApi "github.com/radekg/yugabyte-db-go-proto/v2/yb/api"
)

var (
errNoLeader = fmt.Errorf("client: no leader")
)

// AbstractMasterErrorResponse isn't an error. It represents an RPC response
// returning an instance of the MasterErrorPB error.
// This type is used to check if the client needs to reconnect and retry a call
// in case of a call not being issued against a leader master.
type AbstractMasterErrorResponse interface {
GetError() *ybApi.MasterErrorPB
}

// NoLeaderError represents a client without a leader error.
type NoLeaderError struct{}

func (e *NoLeaderError) Error() string {
return errNoLeader.Error()
}

// RequiresReconnectError is an error indicating a need to reconnect.
type RequiresReconnectError struct {
Cause error
}

func (e *RequiresReconnectError) Error() string {
return multierror.Append(fmt.Errorf("client: requires reconnect"), e.Cause).Error()
}

// UnprocessableResponseError represents a client error where a fully read response
// cannot be deserialized as a protobuf message.
// This error usually implies that a retry is required.
type UnprocessableResponseError struct {
Cause error
ConsumedPayload []byte
}

func (e *UnprocessableResponseError) Error() string {
return multierror.Append(fmt.Errorf("client: unprocessable response"), e.Cause).Error()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/google/uuid v1.3.0
github.com/hashicorp/go-hclog v0.16.2
github.com/hashicorp/go-multierror v1.1.1
// pq used in tests:
github.com/lib/pq v1.9.0
// dockertest/v3 used in tests:
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-hclog v0.16.2 h1:K4ev2ib4LdQETX5cSZBG0DVLk1jwGqSPXBjdah3veNs=
github.com/hashicorp/go-hclog v0.16.2/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand Down

0 comments on commit c9e2b44

Please sign in to comment.