Skip to content

Commit

Permalink
Configure metrics provider
Browse files Browse the repository at this point in the history
  • Loading branch information
radekg committed Jan 12, 2022
1 parent 5f01689 commit ae09cf2
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 75 deletions.
30 changes: 25 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/radekg/yugabyte-db-go-client/configs"
clientErrors "github.com/radekg/yugabyte-db-go-client/errors"
"github.com/radekg/yugabyte-db-go-client/metrics"
"google.golang.org/protobuf/reflect/protoreflect"

ybApi "github.com/radekg/yugabyte-db-go-proto/v2/yb/api"
Expand All @@ -29,6 +30,8 @@ type YBClient interface {
// Uses go-hclog. Users can provide integrate with any logging
// framework using https://pkg.go.dev/github.com/hashicorp/go-hclog#InterceptLogger.
WithLogger(logger hclog.Logger) YBClient
// Allows providing custom implementation of the metrics callback.
WithMetricsCallback(callback metrics.Callback) YBClient
}

var (
Expand All @@ -47,14 +50,16 @@ type defaultYBClient struct {
isConnected bool
lock *sync.Mutex
logger hclog.Logger
metricsCallback metrics.Callback
}

// NewYBClient constructs a new instance of the high-level YugabyteDB client.
func NewYBClient(config *configs.YBClientConfig) YBClient {
return &defaultYBClient{
config: config.WithDefaults(),
lock: &sync.Mutex{},
logger: hclog.Default(),
config: config.WithDefaults(),
lock: &sync.Mutex{},
logger: hclog.Default(),
metricsCallback: metrics.Noop(),
}
}

Expand All @@ -63,6 +68,11 @@ func (c *defaultYBClient) WithLogger(logger hclog.Logger) YBClient {
return c
}

func (c *defaultYBClient) WithMetricsCallback(callback metrics.Callback) YBClient {
c.metricsCallback = callback
return c
}

func (c *defaultYBClient) Close() error {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -197,13 +207,18 @@ func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) e
reconnected := false
for {

c.metricsCallback.ClientReconnectAttempt()

reconnectErr := c.reconnect()

if reconnectErr == nil {
c.metricsCallback.ClientReconnectSuccess()
reconnected = true
break
}

c.metricsCallback.ClientReconnectFailure()

if currentReconnectAttempt == c.config.MaxReconnectAttempts {
break
}
Expand All @@ -219,6 +234,7 @@ func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) e
}

if !reconnected {

c.logger.Error("execute: failed reconnect consecutive maximum reconnect attempts",
"max-attempts", c.config.MaxReconnectAttempts,
"reason", tReconnectError.Cause)
Expand Down Expand Up @@ -268,7 +284,9 @@ func (c *defaultYBClient) connectUnsafe() error {

for hostPort, cliConfig := range validConfigs {
go func(thisHostPort string, thisConfig *configs.YBSingleNodeClientConfig) {
singleNodeClient, err := Connect(thisConfig, c.logger.Named("client"))
singleNodeClient, err := NewDefaultConnector().
WithLogger(c.logger.Named("connected-client")).
WithMetricsCallback(c.metricsCallback).Connect(thisConfig)
if err != nil {
c.logger.Error("failed creating a client",
"reason", err,
Expand Down Expand Up @@ -333,17 +351,20 @@ func (c *defaultYBClient) connectUnsafe() error {
for {
select {
case <-chanErrors:
c.metricsCallback.ClientError()
atomic.AddUint64(&done, 1)
if atomic.LoadUint64(&done) == max {
c.isConnecting = false
return &clientErrors.NoLeaderError{}
}
case connectedClient := <-chanConnectedClient:
c.metricsCallback.ClientConnect()
c.connectedClient = connectedClient
c.isConnecting = false
c.isConnected = true
return nil
case <-time.After(c.config.OpTimeout):
c.metricsCallback.ClientError()
c.isConnecting = false
return errLeaderWaitTimeout
}
Expand All @@ -354,7 +375,6 @@ func (c *defaultYBClient) connectUnsafe() error {
func (c *defaultYBClient) reconnect() error {
// ignore close error
// if the client isn't connected, it does not matter to us
//
c.closeUnsafe()
return c.connectUnsafe()
}
104 changes: 40 additions & 64 deletions client/single_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,74 +2,21 @@ package client

import (
"bytes"
"crypto/tls"
"encoding/binary"
"fmt"
"net"

"github.com/hashicorp/go-hclog"
"github.com/radekg/yugabyte-db-go-client/configs"
"github.com/radekg/yugabyte-db-go-client/errors"
"github.com/radekg/yugabyte-db-go-client/metrics"
"github.com/radekg/yugabyte-db-go-client/utils"
ybApi "github.com/radekg/yugabyte-db-go-proto/v2/yb/api"
"google.golang.org/protobuf/reflect/protoreflect"
)

var recvChunkSize = 4 * 1024

// Connect connects to the master server without TLS.
func Connect(cfg *configs.YBSingleNodeClientConfig, logger hclog.Logger) (YBConnectedClient, error) {
if logger == nil {
logger = hclog.Default().Named("default-client-log")
}
if cfg.TLSConfig != nil {
return connectTLS(cfg, logger)
}
return connect(cfg, logger)
}

func connect(cfg *configs.YBSingleNodeClientConfig, logger hclog.Logger) (YBConnectedClient, error) {
logger.Debug("connecting non-TLS client")
conn, err := net.Dial("tcp", cfg.MasterHostPort)
if err != nil {
return nil, err
}
client := &defaultSingleNodeClient{
originalConfig: cfg,
chanConnected: make(chan struct{}, 1),
chanConnectErr: make(chan error, 1),
closeFunc: func() error {
return conn.Close()
},
conn: conn,
logger: logger,
svcRegistry: NewDefaultServiceRegistry(),
}
return client.afterConnect(), nil
}

func connectTLS(cfg *configs.YBSingleNodeClientConfig, logger hclog.Logger) (YBConnectedClient, error) {
logger.Debug("connecting TLS client")
conn, err := tls.Dial("tcp", cfg.MasterHostPort, cfg.TLSConfig)
if err != nil {
return nil, err
}
client := &defaultSingleNodeClient{
originalConfig: cfg,
chanConnected: make(chan struct{}, 1),
chanConnectErr: make(chan error, 1),
closeFunc: func() error {
return conn.Close()
},
conn: conn,
logger: logger,
svcRegistry: NewDefaultServiceRegistry(),
}
return client.afterConnect(), nil
}

// Connected client

// YBConnectedClient represents a connected client.
type YBConnectedClient interface {
// Close closes the connected client.
Expand All @@ -86,14 +33,15 @@ type YBConnectedClient interface {
}

type defaultSingleNodeClient struct {
originalConfig *configs.YBSingleNodeClientConfig
callCounter int
chanConnected chan struct{}
chanConnectErr chan error
closeFunc func() error
conn net.Conn
logger hclog.Logger
svcRegistry ServiceRegistry
originalConfig *configs.YBSingleNodeClientConfig
callCounter int
chanConnected chan struct{}
chanConnectErr chan error
closeFunc func() error
conn net.Conn
logger hclog.Logger
metricsCallback metrics.Callback
svcRegistry ServiceRegistry
}

// Close closes a connected client.
Expand Down Expand Up @@ -127,6 +75,11 @@ func (c *defaultSingleNodeClient) OnConnectError() <-chan error {
return c.chanConnectErr
}

func (c *defaultSingleNodeClient) WithMetricsCallback(callback metrics.Callback) YBConnectedClient {
c.metricsCallback = callback
return c
}

/// Private interface

func (c *defaultSingleNodeClient) afterConnect() *defaultSingleNodeClient {
Expand Down Expand Up @@ -171,6 +124,7 @@ func (c *defaultSingleNodeClient) recv() (*bytes.Buffer, error) {
if err != nil {
return buf, err
}
c.metricsCallback.ClientBytesReceived(n)
// we read an EOF, finished reading
// previous iteration was fitting
// all in one recvChunkSize
Expand All @@ -195,15 +149,16 @@ func (c *defaultSingleNodeClient) send(buf *bytes.Buffer) error {
if err != nil {
return err
}
c.metricsCallback.ClientBytesSent(n)
if n != nBytesToWrite {
return fmt.Errorf("incomplete write: %d bytes vs %d expected", n, nBytesToWrite)
return fmt.Errorf("write incomplete: %d bytes vs %d expected", n, nBytesToWrite)
}
return nil
}

func (c *defaultSingleNodeClient) readResponseInto(reader *bytes.Buffer, m protoreflect.ProtoMessage) error {

opLogger := c.logger.Named("read-response-into").With("message", m.ProtoReflect().Type().Descriptor().Name())
opLogger := c.logger.With("message", m.ProtoReflect().Type().Descriptor().Name())

// Read the complete data length:
// https://github.com/yugabyte/yugabyte-db/blob/v2.7.2/java/yb-client/src/main/java/org/yb/client/CallResponse.java#L71
Expand Down Expand Up @@ -355,6 +310,8 @@ func (c *defaultSingleNodeClient) executeOp(payload, result protoreflect.ProtoMe

svcInfo := c.svcRegistry.Get(payload)
if svcInfo == nil {
c.metricsCallback.ClientError()
c.metricsCallback.ClientMessageSendFailure()
return &errors.ProtoServiceError{
ProtoType: payload.ProtoReflect().Descriptor().FullName(),
}
Expand All @@ -368,22 +325,41 @@ func (c *defaultSingleNodeClient) executeOp(payload, result protoreflect.ProtoMe

b := bytes.NewBuffer([]byte{})
if err := utils.WriteMessages(b, requestHeader, payload); err != nil {
c.metricsCallback.ClientError()
c.metricsCallback.ClientMessageSendFailure()
return &errors.PayloadWriteError{
Cause: err,
Header: requestHeader,
Payload: payload,
}
}
if err := c.send(b); err != nil {
c.metricsCallback.ClientError()
c.metricsCallback.ClientMessageSendFailure()
return &errors.SendError{Cause: err}
}
buffer, err := c.recv()
if err != nil {
c.metricsCallback.ClientError()
c.metricsCallback.ClientMessageSendFailure()
return &errors.ReceiveError{Cause: err}
}
readResponseErr := c.readResponseInto(buffer, result)
if readResponseErr != nil {
c.metricsCallback.ClientError()
c.metricsCallback.ClientMessageSendFailure()
return readResponseErr
}
c.metricsCallback.ClientMessageSendSuccess()
return nil
}

func (c *defaultSingleNodeClient) withLogger(logger hclog.Logger) *defaultSingleNodeClient {
c.logger = logger
return c
}

func (c *defaultSingleNodeClient) withMetricsCallback(callback metrics.Callback) *defaultSingleNodeClient {
c.metricsCallback = callback
return c
}
Loading

0 comments on commit ae09cf2

Please sign in to comment.