Skip to content

Commit

Permalink
Add tsoStreamBuilderFactory, tsoStreamBuilder and tsoStream to furthe…
Browse files Browse the repository at this point in the history
…r refactor the code.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Feb 27, 2023
1 parent f22e640 commit f0580b8
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 361 deletions.
231 changes: 13 additions & 218 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

const (
Expand Down Expand Up @@ -60,6 +59,8 @@ type BaseClient interface {
CloseClientConns()
// GetClusterID returns the ID of the cluster
GetClusterID(context.Context) uint64
// GetURLs returns the URLs of the servers.
GetURLs() []string
// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
GetTSOAllocators() *sync.Map
// GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation
Expand Down Expand Up @@ -97,21 +98,6 @@ type BaseClient interface {
// AddTSOAllocatorServiceEndpointSwitchedCallback adds callbacks which will be called
// when any global/local tso allocator service endpoint is switched.
AddTSOAllocatorServiceEndpointSwitchedCallback(callbacks ...func())
// CreateTsoStream creates a TSO stream to send/recv timestamps
CreateTsoStream(ctx context.Context, cancel context.CancelFunc, cc *grpc.ClientConn) (interface{}, error)
// TryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
TryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error
// ProcessTSORequests processes TSO requests in streaming mode to get timestamps
ProcessTSORequests(stream interface{}, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error)

// GetURLs returns the URLs of the servers.
// For testing use. It should only be called when the client is closed.
GetURLs() []string
// GetTSOAllocatorServingEndpointURLs returns the urls of the tso allocator leaders
// For testing use.
GetTSOAllocatorServingEndpointURLs() map[string]string
}

var _ BaseClient = (*pdBaseClient)(nil)
Expand Down Expand Up @@ -145,32 +131,20 @@ type pdBaseClient struct {
ctx context.Context
cancel context.CancelFunc

security SecurityOption

tlsCfg *tlsutil.TLSConfig
// Client option.
option *option
}

// SecurityOption records options about tls
type SecurityOption struct {
CAPath string
CertPath string
KeyPath string

SSLCABytes []byte
SSLCertBytes []byte
SSLKEYBytes []byte
}

// newPDBaseClient returns a new baseClient.
func newPDBaseClient(ctx context.Context, cancel context.CancelFunc,
wg *sync.WaitGroup, urls []string, security SecurityOption, option *option) BaseClient {
wg *sync.WaitGroup, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) BaseClient {
bc := &pdBaseClient{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
wg: wg,
security: security,
tlsCfg: tlsCfg,
option: option,
}
bc.urls.Store(urls)
Expand Down Expand Up @@ -240,11 +214,17 @@ func (c *pdBaseClient) CloseClientConns() {
})
}

// getClusterID returns the ClusterID.
// GetClusterID returns the ClusterID.
func (c *pdBaseClient) GetClusterID(context.Context) uint64 {
return c.clusterID
}

// GetURLs returns the URLs of the servers.
// For testing use. It should only be called when the client is closed.
func (c *pdBaseClient) GetURLs() []string {
return c.urls.Load().([]string)
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
func (c *pdBaseClient) GetTSOAllocators() *sync.Map {
return &c.tsoAllocators
Expand Down Expand Up @@ -322,23 +302,6 @@ func (c *pdBaseClient) getFollowerAddrs() []string {
return followerAddrs.([]string)
}

// GetURLs returns the URLs of the servers.
// For testing use. It should only be called when the client is closed.
func (c *pdBaseClient) GetURLs() []string {
return c.urls.Load().([]string)
}

// GetTSOAllocatorServingEndpointURLs returns the urls of the tso allocator leaders
// For testing use.
func (c *pdBaseClient) GetTSOAllocatorServingEndpointURLs() map[string]string {
allocatorLeaders := make(map[string]string)
c.tsoAllocators.Range(func(dcLocation, url interface{}) bool {
allocatorLeaders[dcLocation.(string)] = url.(string)
return true
})
return allocatorLeaders
}

// GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation
func (c *pdBaseClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) {
url, exist := c.tsoAllocators.Load(dcLocation)
Expand Down Expand Up @@ -570,173 +533,5 @@ func (c *pdBaseClient) switchTSOAllocatorLeader(allocatorMap map[string]*pdpb.Me

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr
func (c *pdBaseClient) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
conn, ok := c.clientConns.Load(addr)
if ok {
return conn.(*grpc.ClientConn), nil
}
tlsCfg, err := tlsutil.TLSConfig{
CAPath: c.security.CAPath,
CertPath: c.security.CertPath,
KeyPath: c.security.KeyPath,

SSLCABytes: c.security.SSLCABytes,
SSLCertBytes: c.security.SSLCertBytes,
SSLKEYBytes: c.security.SSLKEYBytes,
}.ToTLSConfig()
if err != nil {
return nil, err
}
dCtx, cancel := context.WithTimeout(c.ctx, dialTimeout)
defer cancel()
cc, err := grpcutil.GetClientConn(dCtx, addr, tlsCfg, c.option.gRPCDialOptions...)
if err != nil {
return nil, err
}
if old, ok := c.clientConns.Load(addr); ok {
cc.Close()
log.Debug("use old connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String()))
return old.(*grpc.ClientConn), nil
}
c.clientConns.Store(addr, cc)
return cc, nil
}

// CreateTsoStream creates a TSO stream to send/recv timestamps
func (c *pdBaseClient) createTsoStreamInternal(ctx context.Context, cancel context.CancelFunc, client pdpb.PDClient) (interface{}, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go c.checkStreamTimeout(ctx, cancel, done, c.option.timeout)
stream, err := client.Tso(ctx)
done <- struct{}{}
return stream, err
}

func (c *pdBaseClient) checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}, timeout time.Duration) {
select {
case <-done:
return
case <-time.After(timeout):
cancel()
case <-ctx.Done():
}
<-done
}

func (c *pdBaseClient) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.clusterID,
}
}

// CreateTsoStream creates a TSO stream to send/recv timestamps
func (c *pdBaseClient) CreateTsoStream(ctx context.Context, cancel context.CancelFunc, cc *grpc.ClientConn) (interface{}, error) {
return c.createTsoStreamInternal(ctx, cancel, pdpb.NewPDClient(cc))
}

func (c *pdBaseClient) getAllClients() map[string]pdpb.PDClient {
var (
addrs = c.GetURLs()
clients = make(map[string]pdpb.PDClient, len(addrs))
cc *grpc.ClientConn
err error
)
for _, addr := range addrs {
if len(addrs) == 0 {
continue
}
if cc, err = c.GetOrCreateGRPCConn(addr); err != nil {
continue
}
healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout)
resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
clients[addr] = pdpb.NewPDClient(cc)
}
}
return clients
}

// TryConnectToTSOWithProxy will create multiple streams to all the PD servers to work as a TSO proxy
// to reduce the pressure of PD leader.
func (c *pdBaseClient) TryConnectToTSOWithProxy(
dispatcherCtx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
clients := c.getAllClients()
leaderAddr := c.getLeaderAddr()
forwardedHost, ok := c.GetTSOAllocatorServingAddrByDCLocation(dc)
if !ok {
return errors.Errorf("cannot find the allocator leader in %s", dc)
}
// GC the stale one.
connectionCtxs.Range(func(addr, cc interface{}) bool {
if _, ok := clients[addr.(string)]; !ok {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(addr)
}
return true
})
// Update the missing one.
for addr, client := range clients {
if _, ok = connectionCtxs.Load(addr); ok {
continue
}
cctx, cancel := context.WithCancel(dispatcherCtx)
// Do not proxy the leader client.
if addr != leaderAddr {
log.Info("[pd] use follower to forward tso stream to do the proxy", zap.String("dc", dc), zap.String("addr", addr))
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
}
// Create the TSO stream.
stream, err := c.createTsoStreamInternal(cctx, cancel, client)
if err == nil {
if addr != leaderAddr {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addrTrim := trimHTTPPrefix(addr)
requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1)
}
connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel})
continue
}
log.Error("[pd] create the tso stream failed", zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err))
cancel()
}
return nil
}

// ProcessTSORequests processes TSO requests in streaming mode to get timestamps
func (c *pdBaseClient) ProcessTSORequests(stream interface{}, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error) {
tsoStream := stream.(pdpb.PD_TsoClient)

start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Header: c.requestHeader(),
Count: uint32(count),
DcLocation: dcLocation,
}

if err = tsoStream.Send(req); err != nil {
err = errors.WithStack(err)
return
}
tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime)))
resp, err := tsoStream.Recv()
if err != nil {
err = errors.WithStack(err)
return
}
requestDurationTSO.Observe(time.Since(start).Seconds())
tsoBatchSize.Observe(float64(count))

if resp.GetCount() != uint32(count) {
err = errors.WithStack(errTSOLength)
return
}

physical, logical, suffixBits = resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits()
return
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...)
}
38 changes: 32 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -240,6 +241,7 @@ var _ Client = (*client)(nil)

type client struct {
bc BaseClient
tsoStreamBuilderFactory
// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding dc-location TSO channel.
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
Expand All @@ -264,6 +266,17 @@ type client struct {
option *option
}

// SecurityOption records options about tls
type SecurityOption struct {
CAPath string
CertPath string
KeyPath string

SSLCABytes []byte
SSLCertBytes []byte
SSLKEYBytes []byte
}

// NewClient creates a PD client.
func NewClient(svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), svrAddrs, security, opts...)
Expand All @@ -272,8 +285,9 @@ func NewClient(svrAddrs []string, security SecurityOption, opts ...ClientOption)
// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", svrAddrs))
c, clientCtx, clientCancel := createClient(ctx)
c.bc = newPDBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), security, c.option)
c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security)
c.tsoStreamBuilderFactory = &pdTSOStreamBuilderFactory{}
c.bc = newPDBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option)
if err := c.setup(true, true, opts...); err != nil {
return nil, err
}
Expand All @@ -283,15 +297,26 @@ func NewClientWithContext(ctx context.Context, svrAddrs []string, security Secur
// NewTSOClientWithContext creates a TSO client with context.
func NewTSOClientWithContext(ctx context.Context, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd(tso)] create tso client with endpoints", zap.Strings("tso-address", svrAddrs))
c, clientCtx, clientCancel := createClient(ctx)
c.bc = newTSOBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), security, c.option)
c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security)
c.tsoStreamBuilderFactory = &tsoTSOStreamBuilderFactory{}
c.bc = newTSOBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option)
if err := c.setup(true, false, opts...); err != nil {
return nil, err
}
return c, nil
}

func createClient(ctx context.Context) (*client, context.Context, context.CancelFunc) {
func createClient(ctx context.Context, security *SecurityOption) (*client, context.Context, context.CancelFunc, *tlsutil.TLSConfig) {
tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
KeyPath: security.KeyPath,

SSLCABytes: security.SSLCABytes,
SSLCertBytes: security.SSLCertBytes,
SSLKEYBytes: security.SSLKEYBytes,
}

clientCtx, clientCancel := context.WithCancel(ctx)
c := &client{
checkTSDeadlineCh: make(chan struct{}),
Expand All @@ -302,7 +327,8 @@ func createClient(ctx context.Context) (*client, context.Context, context.Cancel
cancel: clientCancel,
option: newOption(),
}
return c, clientCtx, clientCancel

return c, clientCtx, clientCancel, tlsCfg
}

func (c *client) setup(enableTSO, enableAdmissionCtl bool, opts ...ClientOption) error {
Expand Down
3 changes: 2 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/testutil"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/goleak"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestGRPCDialOption(t *testing.T) {
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
security: SecurityOption{},
tlsCfg: &tlsutil.TLSConfig{},
option: newOption(),
}
cli.urls.Store([]string{testClientURL})
Expand Down
Loading

0 comments on commit f0580b8

Please sign in to comment.