Skip to content

Commit

Permalink
More proper error types in single node client
Browse files Browse the repository at this point in the history
  • Loading branch information
radekg committed Jan 11, 2022
1 parent 17f410b commit 0f3714b
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 36 deletions.
3 changes: 1 addition & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/radekg/yugabyte-db-go-client/configs"
clientErrors "github.com/radekg/yugabyte-db-go-client/errors"
"google.golang.org/protobuf/reflect/protoreflect"
Expand Down Expand Up @@ -199,7 +198,7 @@ func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) e
c.logger.Error("execute: failed reconnect consecutive maximum reconnect attempts",
"max-attempts", c.config.MaxReconnectAttempts,
"reason", tReconnectError.Cause)
return multierror.Append(errNotReconnected, tReconnectError.Cause)
return fmt.Errorf("%s: %s", errNotReconnected.Error(), tReconnectError.Cause.Error())
}

// retry:
Expand Down
32 changes: 27 additions & 5 deletions client/single_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,18 @@ func (c *defaultSingleNodeClient) afterConnect() *defaultSingleNodeClient {
header := append([]byte("YB"), 1)
n, err := c.conn.Write(header)
if err != nil {
c.chanConnectErr <- err
c.chanConnectErr <- &errors.ProtocolConnectionHeaderWriteError{
Cause: err,
}
close(c.chanConnected)
return
}
if n != len(header) {
c.chanConnectErr <- fmt.Errorf("header not written: %d vs expected %d", n, len(header))
c.chanConnectErr <- &errors.ProtocolConnectionHeaderWriteIncompleteError{
Header: header,
Expected: len(header),
Written: n,
}
close(c.chanConnected)
return
}
Expand Down Expand Up @@ -190,6 +196,7 @@ func (c *defaultSingleNodeClient) send(buf *bytes.Buffer) error {
return err
}
if n != nBytesToWrite {
// TODO: proper error type
return fmt.Errorf("not all bytes written: %d vs expected %d", n, nBytesToWrite)
}
return nil
Expand All @@ -204,6 +211,7 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto
var dataLength int32
if err := binary.Read(reader, binary.BigEndian, &dataLength); err != nil {
opLogger.Error("failed reading response header length", "reason", err)
// TODO: proper error type
return err
}

Expand All @@ -213,6 +221,7 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto
responseHeaderLength, err := utils.ReadUvarint32(reader)
if err != nil {
opLogger.Error("failed reading response header length", "reason", err)
// TODO: proper error type
return err
}

Expand All @@ -224,6 +233,7 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto
n, err := reader.Read(responseHeaderBuf)
if err != nil {
opLogger.Error("failed reading response header", "reason", err)
// TODO: proper error type
return err
}

Expand All @@ -237,13 +247,15 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto
opLogger.Error("response header read bytes count != expected count",
"expected-header-length", responseHeaderLength,
"read-header-length", n)
// TODO: proper error type
return fmt.Errorf("expected to read %d but read %d", responseHeaderLength, n)
}

responseHeader := &ybApi.ResponseHeader{}
protoErr := utils.DeserializeProto(responseHeaderBuf, responseHeader)
if protoErr != nil {
opLogger.Error("failed unmarshalling response header", "reason", err)
// TODO: proper error type
return err
}

Expand All @@ -258,6 +270,7 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto
responsePayloadLength, err := utils.ReadUvarint32(reader)
if err != nil {
opLogger.Error("failed reading response payload length", "reason", err)
// TODO: proper error type
return err
}

Expand All @@ -274,6 +287,7 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto
n, err = reader.Read(responsePayloadBuf)
if err != nil {
opLogger.Error("failed reading response payload", "reason", err)
// TODO: proper error type
return err
}

Expand All @@ -291,6 +305,7 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto
opLogger.Error("response payload read bytes count != expected count",
"expected-payload-length", responsePayloadLength,
"read-payload-length", n)
// TODO: proper error type
return fmt.Errorf("expected to read %d but read %d", responsePayloadLength, n)
}

Expand All @@ -308,6 +323,7 @@ func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m proto
opLogger.Error("consumed too much data",
"expected-payload-length", responsePayloadLength,
"consumed-payload-length", n)
// TODO: proper error type
return fmt.Errorf("consumed too much data, expected %d but read %d", responsePayloadLength, n)
}
}
Expand All @@ -328,7 +344,9 @@ func (c *defaultSingleNodeClient) executeOp(payload, result protoreflect.ProtoMe

svcInfo := c.svcRegistry.Get(payload)
if svcInfo == nil {
return fmt.Errorf("no service info for proto type '%s'", payload.ProtoReflect().Descriptor().FullName()) // TODO: introduce a proper error type
return &errors.ProtoServiceError{
ProtoType: payload.ProtoReflect().Descriptor().FullName(),
}
}

requestHeader := &ybApi.RequestHeader{
Expand All @@ -339,12 +357,16 @@ func (c *defaultSingleNodeClient) executeOp(payload, result protoreflect.ProtoMe

b := bytes.NewBuffer([]byte{})
if err := utils.WriteMessages(b, requestHeader, payload); err != nil {
return err
return &errors.PayloadWriteError{
Cause: err,
Header: requestHeader,
Payload: payload,
}
}
if err := c.send(b); err != nil {
return &errors.SendReceiveError{Cause: err}
}
buffer, err := c.recv() // TODO: can move this to readResponseInto
buffer, err := c.recv()
if err != nil {
return &errors.SendReceiveError{Cause: err}
}
Expand Down
88 changes: 74 additions & 14 deletions errors/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,37 @@ package errors
import (
"fmt"

"github.com/hashicorp/go-multierror"
"google.golang.org/protobuf/reflect/protoreflect"

ybApi "github.com/radekg/yugabyte-db-go-proto/v2/yb/api"
)

const (
ErrorMessageConnected = "client: connected"
ErrorMessageConnecting = "client: connecting"
ErrorMessageLeaderWaitTimeout = "client: leader wait timed out"
ErrorMessageNoClient = "client: no client"
ErrorMessageNoLeader = "client: no leader"
ErrorMessageNotConnected = "client: not connected"
ErrorMessageReconnectFailed = "client: reconnect failed"
ErrorMessageReconnectRequired = "client: reconnect required"
ErrorMessageSendReceiveFailed = "client: send/receive failed"
// ErrorMessageConnected is an error message.
ErrorMessageConnected = "client: connected"
// ErrorMessageConnecting is an error message.
ErrorMessageConnecting = "client: connecting"
// ErrorMessageLeaderWaitTimeout is an error message.
ErrorMessageLeaderWaitTimeout = "client: leader wait timed out"
// ErrorMessageNoClient is an error message.
ErrorMessageNoClient = "client: no client"
// ErrorMessageNoLeader is an error message.
ErrorMessageNoLeader = "client: no leader"
// ErrorMessageNotConnected is an error message.
ErrorMessageNotConnected = "client: not connected"
// ErrorMessagePayloadError is an error message.
ErrorMessagePayloadError = "client: payload error"
// ErrorMessageProtocolConnectionHeader is an error message.
ErrorMessageProtocolConnectionHeader = "client: protocol connection header error"
// ErrorMessageProtoServiceError is an error message.
ErrorMessageProtoServiceError = "client: proto service error"
// ErrorMessageReconnectFailed is an error message.
ErrorMessageReconnectFailed = "client: reconnect failed"
// ErrorMessageReconnectRequired is an error message.
ErrorMessageReconnectRequired = "client: reconnect required"
// ErrorMessageSendReceiveFailed is an error message.
ErrorMessageSendReceiveFailed = "client: send/receive failed"
// ErrorMessageUnprocessableResponse is an error message.
ErrorMessageUnprocessableResponse = "client: unprocessable response"
)

Expand All @@ -36,23 +52,67 @@ func (e *NoLeaderError) Error() string {
return ErrorMessageNoLeader
}

// PayloadWriteError happens when the client cannot serialize the header
// or the payload. This is a non-recoverable error.
type PayloadWriteError struct {
Cause error
Header *ybApi.RequestHeader
Payload protoreflect.ProtoMessage
}

func (e *PayloadWriteError) Error() string {
return fmt.Sprintf("%s: %s", ErrorMessagePayloadError, e.Cause.Error())
}

// ProtocolConnectionHeaderWriteError is an error returned when the initial
// connect header could not be written.
type ProtocolConnectionHeaderWriteError struct {
Cause error
}

func (e *ProtocolConnectionHeaderWriteError) Error() string {
return fmt.Sprintf("%s: %s", ErrorMessageProtocolConnectionHeader, e.Cause.Error())
}

// ProtocolConnectionHeaderWriteIncompleteError is an error returned when the initial
// connect header could not be fully written.
type ProtocolConnectionHeaderWriteIncompleteError struct {
Header []byte
Expected int
Written int
}

func (e *ProtocolConnectionHeaderWriteIncompleteError) Error() string {
return fmt.Sprintf("%s: written %d bytes vs expected %d bytes", ErrorMessageProtocolConnectionHeader, e.Written, e.Expected)
}

// ProtoServiceError happens when the service registry cannot identify
// a service for a protobuf type. This is a non-recoverable error.
type ProtoServiceError struct {
ProtoType protoreflect.FullName
}

func (e *ProtoServiceError) Error() string {
return fmt.Sprintf("%s: %s", ErrorMessageProtoServiceError, e.ProtoType)
}

// RequiresReconnectError is an error indicating a need to reconnect.
type RequiresReconnectError struct {
Cause error
}

func (e *RequiresReconnectError) Error() string {
return multierror.Append(fmt.Errorf(ErrorMessageReconnectRequired), e.Cause).Error()
return fmt.Sprintf("%s: no service for type '%s'", ErrorMessageReconnectRequired, e.Cause.Error())
}

// SendReceiveError is returned when the client is unable to
// send the paylod or receive from the server.
// send the payload or receive from the server.
type SendReceiveError struct {
Cause error
}

func (e *SendReceiveError) Error() string {
return multierror.Append(fmt.Errorf(ErrorMessageSendReceiveFailed), e.Cause).Error()
return fmt.Sprintf("%s: %s", ErrorMessageSendReceiveFailed, e.Cause.Error())
}

// UnprocessableResponseError represents a client error where a fully read response
Expand All @@ -64,5 +124,5 @@ type UnprocessableResponseError struct {
}

func (e *UnprocessableResponseError) Error() string {
return multierror.Append(fmt.Errorf(ErrorMessageUnprocessableResponse), e.Cause).Error()
return fmt.Sprintf("%s: %s", ErrorMessageUnprocessableResponse, e.Cause.Error())
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.16
require (
github.com/google/uuid v1.3.0
github.com/hashicorp/go-hclog v0.16.2
github.com/hashicorp/go-multierror v1.1.1
// pq used in tests:
github.com/lib/pq v1.9.0
// dockertest/v3 used in tests:
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,8 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-hclog v0.16.2 h1:K4ev2ib4LdQETX5cSZBG0DVLk1jwGqSPXBjdah3veNs=
github.com/hashicorp/go-hclog v0.16.2/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand Down
12 changes: 2 additions & 10 deletions testutils/master/master_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package master

import (
"strings"
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/radekg/yugabyte-db-go-client/client"
"github.com/radekg/yugabyte-db-go-client/configs"
"github.com/radekg/yugabyte-db-go-client/errors"
Expand Down Expand Up @@ -95,15 +95,7 @@ func TestMasterReconnect(t *testing.T) {
err := client.Execute(request, response)
assert.NotNil(t, err)

wasReconnectFailedError := false
if tMultiError, ok := err.(*multierror.Error); ok {
for _, me := range tMultiError.Errors {
if me.Error() == errors.ErrorMessageReconnectFailed {
wasReconnectFailedError = true
break
}
}
}
wasReconnectFailedError := strings.HasPrefix(err.Error(), errors.ErrorMessageReconnectFailed)
assert.True(t, wasReconnectFailedError, "expected reconnect failed error")

}

0 comments on commit 0f3714b

Please sign in to comment.