Skip to content

Commit

Permalink
Test retries and fix/improve related code
Browse files Browse the repository at this point in the history
  • Loading branch information
radekg committed Jan 11, 2022
1 parent 8beb849 commit 17f410b
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 29 deletions.
53 changes: 32 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ type YBClient interface {
}

var (
errAlreadyConnected = fmt.Errorf("client: already connected")
errConnecting = fmt.Errorf("client: connecting")
errLeaderWaitTimeout = fmt.Errorf("client: leader wait timed out")
errNoClient = fmt.Errorf("client: no client")
errNotReconnected = fmt.Errorf("client: reconnect failed")
errConnected = fmt.Errorf(clientErrors.ErrorMessageConnected)
errConnecting = fmt.Errorf(clientErrors.ErrorMessageConnecting)
errLeaderWaitTimeout = fmt.Errorf(clientErrors.ErrorMessageLeaderWaitTimeout)
errNoClient = fmt.Errorf(clientErrors.ErrorMessageNoClient)
errNotConnected = fmt.Errorf(clientErrors.ErrorMessageNotConnected)
errNotReconnected = fmt.Errorf(clientErrors.ErrorMessageReconnectFailed)
)

type defaultYBClient struct {
config *configs.YBClientConfig
connectedClient YBConnectedClient
isConnecting bool
isConnected bool
lock *sync.Mutex
logger hclog.Logger
}
Expand All @@ -59,28 +61,32 @@ func (c *defaultYBClient) Close() error {
if c.connectedClient == nil {
return errNoClient
}
return c.closeUnsafe()
closeError := c.closeUnsafe()
c.isConnected = false
c.connectedClient = nil
return closeError
}

func (c *defaultYBClient) Connect() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.isConnecting {
return errConnecting
}

if c.connectedClient != nil {
return errAlreadyConnected
if c.isConnected || c.connectedClient != nil {
return errConnected
}

return c.connectUnsafe()
}

func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) error {
c.lock.Lock()
defer c.lock.Unlock()

if !c.isConnected {
return errNotConnected
}

if c.connectedClient == nil {
return errNoClient
}
Expand Down Expand Up @@ -131,14 +137,19 @@ func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) e
return reportErr
}

// broken pipe qualifies for immediate retry:
if errors.Is(executeErr, syscall.EPIPE) {
// broken pipe qualifies for immediate retry:
executeErr = &clientErrors.RequiresReconnectError{
Cause: executeErr,
}
}

if _, ok := executeErr.(*clientErrors.UnprocessableResponseError); ok {
} else if _, ok := executeErr.(*clientErrors.SendReceiveError); ok {
// the client was connected but is no longer able to
// communicate with the server, this qualifies
// for reconnect
executeErr = &clientErrors.RequiresReconnectError{
Cause: executeErr,
}
} else if _, ok := executeErr.(*clientErrors.UnprocessableResponseError); ok {
// complete payload has been read from the server
// but payload could not be deserialized as protobuf,
// this qualifies for immediate retry:
Expand Down Expand Up @@ -206,9 +217,7 @@ func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) e
}

func (c *defaultYBClient) closeUnsafe() error {
closeError := c.connectedClient.Close()
c.connectedClient = nil
return closeError
return c.connectedClient.Close()
}

func (c *defaultYBClient) connectUnsafe() error {
Expand Down Expand Up @@ -309,6 +318,7 @@ func (c *defaultYBClient) connectUnsafe() error {
case connectedClient := <-chanConnectedClient:
c.connectedClient = connectedClient
c.isConnecting = false
c.isConnected = true
return nil
case <-time.After(c.config.OpTimeout):
c.isConnecting = false
Expand All @@ -319,8 +329,9 @@ func (c *defaultYBClient) connectUnsafe() error {
}

func (c *defaultYBClient) reconnect() error {
if err := c.closeUnsafe(); err != nil {
return err
}
// ignore close error
// if the client isn't connected, it does not matter to us
//
c.closeUnsafe()
return c.connectUnsafe()
}
4 changes: 2 additions & 2 deletions client/single_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,11 @@ func (c *defaultSingleNodeClient) executeOp(payload, result protoreflect.ProtoMe
return err
}
if err := c.send(b); err != nil {
return err
return &errors.SendReceiveError{Cause: err}
}
buffer, err := c.recv() // TODO: can move this to readResponseInto
if err != nil {
return err
return &errors.SendReceiveError{Cause: err}
}
readResponseErr := c.readResponseInto(buffer, result)
if readResponseErr != nil {
Expand Down
29 changes: 24 additions & 5 deletions errors/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@ import (
ybApi "github.com/radekg/yugabyte-db-go-proto/v2/yb/api"
)

var (
errNoLeader = fmt.Errorf("client: no leader")
const (
ErrorMessageConnected = "client: connected"
ErrorMessageConnecting = "client: connecting"
ErrorMessageLeaderWaitTimeout = "client: leader wait timed out"
ErrorMessageNoClient = "client: no client"
ErrorMessageNoLeader = "client: no leader"
ErrorMessageNotConnected = "client: not connected"
ErrorMessageReconnectFailed = "client: reconnect failed"
ErrorMessageReconnectRequired = "client: reconnect required"
ErrorMessageSendReceiveFailed = "client: send/receive failed"
ErrorMessageUnprocessableResponse = "client: unprocessable response"
)

// AbstractMasterErrorResponse isn't an error. It represents an RPC response
Expand All @@ -24,7 +33,7 @@ type AbstractMasterErrorResponse interface {
type NoLeaderError struct{}

func (e *NoLeaderError) Error() string {
return errNoLeader.Error()
return ErrorMessageNoLeader
}

// RequiresReconnectError is an error indicating a need to reconnect.
Expand All @@ -33,7 +42,17 @@ type RequiresReconnectError struct {
}

func (e *RequiresReconnectError) Error() string {
return multierror.Append(fmt.Errorf("client: requires reconnect"), e.Cause).Error()
return multierror.Append(fmt.Errorf(ErrorMessageReconnectRequired), e.Cause).Error()
}

// SendReceiveError is returned when the client is unable to
// send the paylod or receive from the server.
type SendReceiveError struct {
Cause error
}

func (e *SendReceiveError) Error() string {
return multierror.Append(fmt.Errorf(ErrorMessageSendReceiveFailed), e.Cause).Error()
}

// UnprocessableResponseError represents a client error where a fully read response
Expand All @@ -45,5 +64,5 @@ type UnprocessableResponseError struct {
}

func (e *UnprocessableResponseError) Error() string {
return multierror.Append(fmt.Errorf("client: unprocessable response"), e.Cause).Error()
return multierror.Append(fmt.Errorf(ErrorMessageUnprocessableResponse), e.Cause).Error()
}
64 changes: 63 additions & 1 deletion testutils/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/radekg/yugabyte-db-go-client/client"
"github.com/radekg/yugabyte-db-go-client/configs"
"github.com/radekg/yugabyte-db-go-client/errors"
"github.com/radekg/yugabyte-db-go-client/testutils/common"
ybApi "github.com/radekg/yugabyte-db-go-proto/v2/yb/api"
"github.com/stretchr/testify/assert"
)

func TestMasterIntegration(t *testing.T) {
Expand Down Expand Up @@ -40,8 +43,67 @@ func TestMasterIntegration(t *testing.T) {
if err != nil {
return err
}
t.Log(" ==> Received master list", response)
t.Log("Received master list", response)
return nil
})

}

func TestMasterReconnect(t *testing.T) {

request := &ybApi.ListMastersRequestPB{}

testCtx := SetupMasters(t, &common.TestMasterConfiguration{
ReplicationFactor: 1,
MasterPrefix: "master-it",
})
defer testCtx.Cleanup()

client := client.NewYBClient(&configs.YBClientConfig{
MasterHostPort: testCtx.MasterExternalAddresses(),
OpTimeout: time.Duration(time.Second * 5),
MaxReconnectAttempts: 1,
ReconnectRetryInterval: time.Duration(time.Millisecond * 100),
}, hclog.Default())

errNotConnected := client.Execute(request, &ybApi.ListMastersResponsePB{})
assert.NotNil(t, errNotConnected, "expected an error")

common.Eventually(t, 15, func() error {
if err := client.Connect(); err != nil {
return err
}
return nil
})

defer client.Close()

common.Eventually(t, 15, func() error {

response := &ybApi.ListMastersResponsePB{}
err := client.Execute(request, response)
if err != nil {
return err
}
t.Log("Received master list", response)
return nil
})

testCtx.Cleanup()

response := &ybApi.ListMastersResponsePB{}
err := client.Execute(request, response)
assert.NotNil(t, err)

wasReconnectFailedError := false
if tMultiError, ok := err.(*multierror.Error); ok {
for _, me := range tMultiError.Errors {
if me.Error() == errors.ErrorMessageReconnectFailed {
wasReconnectFailedError = true
break
}
}
}
assert.True(t, wasReconnectFailedError, "expected reconnect failed error")

}

0 comments on commit 17f410b

Please sign in to comment.