Skip to content

Commit

Permalink
tso client impl (part 1)
Browse files Browse the repository at this point in the history
Changes:
1. Define the interface BaseClient which is generally for service discovery on a quorum-based cluster or a primary/secondy configured cluster so that the grpc client logic layer can decouple from the server discovery layer.
2. Rename baseClient to pdBaseClient then refactor it to implements BaseClient interface. It provides a basic implementation of service discovery on a quorum-based cluster.
3. Refactor pd client with template design patter to provides general client side service discovery framework and general TSO batching, forwarding, async and pooling framework.
4. Add skeleton of tsoBaseClient which is a basic implementation of server discover on a primary/secondary configured cluster.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Feb 22, 2023
1 parent 94d05f0 commit dfaf5b4
Show file tree
Hide file tree
Showing 10 changed files with 736 additions and 330 deletions.
420 changes: 340 additions & 80 deletions client/base_client.go

Large diffs are not rendered by default.

347 changes: 140 additions & 207 deletions client/client.go

Large diffs are not rendered by default.

15 changes: 7 additions & 8 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestUpdateURLs(t *testing.T) {
}
return
}
cli := &baseClient{option: newOption()}
cli := &pdBaseClient{option: newOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetURLs())
Expand Down Expand Up @@ -89,13 +89,12 @@ func TestGRPCDialOption(t *testing.T) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &baseClient{
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
security: SecurityOption{},
option: newOption(),
cli := &pdBaseClient{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
security: SecurityOption{},
option: newOption(),
}
cli.urls.Store([]string{testClientURL})
cli.option.gRPCDialOptions = []grpc.DialOption{grpc.WithBlock()}
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125
github.com/pingcap/kvproto v0.0.0-20230220070831-5cc42e4327e4
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/kvproto v0.0.0-20230220070831-5cc42e4327e4 h1:A3F8guzyZoU1vShtTnGwqW+ZK3RxGhfAf5EZJR8+mNQ=
github.com/pingcap/kvproto v0.0.0-20230220070831-5cc42e4327e4/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
9 changes: 4 additions & 5 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// KeyspaceClient manages keyspace metadata.
Expand All @@ -39,8 +38,8 @@ type KeyspaceClient interface {

// keyspaceClient returns the KeyspaceClient from current PD leader.
func (c *client) keyspaceClient() keyspacepb.KeyspaceClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return keyspacepb.NewKeyspaceClient(cc.(*grpc.ClientConn))
if client := c.bc.GetServingEndpointClientConn(); client != nil {
return keyspacepb.NewKeyspaceClient(client)
}
return nil
}
Expand All @@ -64,7 +63,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key

if err != nil {
cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
c.bc.ScheduleCheckIfMembershipChanged()
return nil, err
}

Expand Down Expand Up @@ -143,7 +142,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp

if err != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
c.bc.ScheduleCheckIfMembershipChanged()
return nil, err
}

Expand Down
8 changes: 4 additions & 4 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ResourceManagerClient interface {

// resourceManagerClient gets the ResourceManager client of current PD leader.
func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {
if cc, err := c.getOrCreateGRPCConn(c.GetLeaderAddr()); err == nil {
if cc, err := c.bc.GetOrCreateGRPCConn(c.GetLeaderAddr()); err == nil {
return rmpb.NewResourceManagerClient(cc)
}
return nil
Expand All @@ -60,7 +60,7 @@ func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {
// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotPrimary) {
c.ScheduleCheckLeader()
c.bc.ScheduleCheckIfMembershipChanged()
}
}

Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb
// If the stream is still nil, return an error.
if stream == nil {
firstRequest.done <- errors.Errorf("failed to get the stream connection")
c.ScheduleCheckLeader()
c.bc.ScheduleCheckIfMembershipChanged()
connection.reset()
continue
}
Expand All @@ -315,7 +315,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb
default:
}
if err = c.processTokenRequests(stream, firstRequest); err != nil {
c.ScheduleCheckLeader()
c.bc.ScheduleCheckIfMembershipChanged()
connection.reset()
log.Info("[resource_manager] token request error", zap.Error(err))
}
Expand Down
224 changes: 222 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@

package pd

import "context"
import (
"context"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/tsopb"
"google.golang.org/grpc"
)

// TSOClient manages resource group info and token request.
type TSOClient interface {
Expand All @@ -35,7 +44,7 @@ type TSOClient interface {
// here is in a basic manner and only for testing and integration purpose -- no batching,
// no async, no pooling, no forwarding, no retry and no deliberate error handling.
func (c *client) GetTSWithinKeyspace(ctx context.Context, keyspaceID uint32) (physical int64, logical int64, err error) {
resp := c.GetTSAsync(ctx)
resp := c.GetTSWithinKeyspaceAsync(ctx, keyspaceID)
return resp.Wait()
}

Expand All @@ -55,5 +64,216 @@ func (c *client) GetTSWithinKeyspaceAsync(ctx context.Context, keyspaceID uint32
// without block the caller.
// TODO: implement the following API
func (c *client) GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation string, keyspaceID uint32) TSFuture {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("GetLocalTSAsync", opentracing.ChildOf(span.Context()))
ctx = opentracing.ContextWithSpan(ctx, span)
}
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
req.start = time.Now()
req.dcLocation = dcLocation
req.keyspaceID = keyspaceID
if err := c.dispatchRequest(dcLocation, req); err != nil {
// Wait for a while and try again
time.Sleep(50 * time.Millisecond)
if err = c.dispatchRequest(dcLocation, req); err != nil {
req.done <- err
}
}
return req
}

// tsoBaseClient is the service discovery client of TSO microservice which is primary/standby configured
type tsoBaseClient struct {
clusterID uint64
// addr -> a gRPC connection
clientConns sync.Map // Store as map[string]*grpc.ClientConn
// dc-location -> TSO allocator leader URL
tsoAllocators sync.Map // Store as map[string]string

// primarySwitchedCallbacks will be called after the primary swichted
primarySwitchedCallbacks []func()
// leaderSwitchedCallbacks will be called after there is any membership
// change in the leader and followers
membersChangedCallbacks []func()

checkMembershipCh chan struct{}

wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

security SecurityOption

// Client option.
option *option
}

var _ BaseClient = (*tsoBaseClient)(nil)

// Init initialize the concrete client underlying
func (c *tsoBaseClient) Init() error {
return nil
}

// Close all grpc client connnections
func (c *tsoBaseClient) CloseClientConns() {
}

// GetClusterID returns the ID of the cluster
func (c *tsoBaseClient) GetClusterID(context.Context) uint64 {
return 0
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
func (c *tsoBaseClient) GetTSOAllocators() *sync.Map {
return nil
}

// GetTSOAllocatorLeaderAddrByDCLocation returns the tso allocator of the given dcLocation
func (c *tsoBaseClient) GetTSOAllocatorLeaderAddrByDCLocation(dcLocation string) (string, bool) {
return "", false
}

// GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection
// of the given dcLocation
func (c *tsoBaseClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) {
return nil, ""
}

// GetServingEndpointAddr returns the grpc client connection of the serving endpoint
// which is the leader in a quorum-based cluster or the primary in a primary/secondy
// configured cluster.
func (c *tsoBaseClient) GetServingEndpointClientConn() *grpc.ClientConn {
return nil
}

// GetServingEndpointAddr returns the serving endpoint which is the leader
// in a quorum-based cluster or the primary in a primary/secondy configured cluster.
func (c *tsoBaseClient) GetServingEndpointAddr() string {
return ""
}

// GetBackupEndpointsAddrs gets the addresses of the current reachable and healthy
// backup service endpoints randomly. Backup service endpoints are followers in a
// quorum-based cluster or secondaries in a primary/secondary configured cluster.
func (c *tsoBaseClient) GetBackupEndpointsAddrs() []string {
return make([]string, 0)
}

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr
func (c *tsoBaseClient) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return nil, nil
}

// ScheduleCheckIfMembershipChanged is used to trigger a check to see if there is any
// membership change among the leader/followers in a quorum-based cluster or among
// the primary/secondaries in a primary/secondy configured cluster.
func (c *tsoBaseClient) ScheduleCheckIfMembershipChanged() {

}

// Immediately checkif there is any membership change among the leader/followers in a
// quorum-based cluster or among the primary/secondaries in a primary/secondy configured cluster.
func (c *tsoBaseClient) CheckIfMembershipChanged() error {
return nil
}

// AddServiceEndpointSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
func (c *tsoBaseClient) AddServiceEndpointSwitchedCallback(callbacks ...func()) {

}

// AddServiceEndpointsChangedCallback adds callbacks which will be called when any leader/follower
// in a quorum-based cluster or the primary in a primary/secondary configured cluster is changed.
func (c *tsoBaseClient) AddServiceEndpointsChangedCallback(callbacks ...func()) {

}

// CreateTsoStream creates a TSO stream to send/recv timestamps
func (c *tsoBaseClient) createTsoStreamInternal(ctx context.Context, cancel context.CancelFunc, client tsopb.TSOClient) (interface{}, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go c.checkStreamTimeout(ctx, cancel, done)
stream, err := client.Tso(ctx)
done <- struct{}{}
return stream, err
}

func (c *tsoBaseClient) checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}) {
select {
case <-done:
return
case <-time.After(c.option.timeout):
cancel()
case <-ctx.Done():
}
<-done
}

func (c *tsoBaseClient) requestHeader() *tsopb.RequestHeader {
return &tsopb.RequestHeader{
ClusterId: c.clusterID,
}
}

// CreateTsoStream creates a TSO stream to send/recv timestamps
func (c *tsoBaseClient) CreateTsoStream(ctx context.Context, cancel context.CancelFunc, cc *grpc.ClientConn) (interface{}, error) {
return c.createTsoStreamInternal(ctx, cancel, tsopb.NewTSOClient(cc))
}

// TryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *tsoBaseClient) TryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error {
return nil
}

// ProcessTSORequests processes TSO requests in streaming mode to get timestamps
func (c *tsoBaseClient) ProcessTSORequests(stream interface{}, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error) {
tsoStream := stream.(tsopb.TSO_TsoClient)

start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Header: c.requestHeader(),
Count: uint32(count),
DcLocation: dcLocation,
}

if err = tsoStream.Send(req); err != nil {
err = errors.WithStack(err)
return
}
tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime)))
resp, err := tsoStream.Recv()
if err != nil {
err = errors.WithStack(err)
return
}
requestDurationTSO.Observe(time.Since(start).Seconds())
tsoBatchSize.Observe(float64(count))

if resp.GetCount() != uint32(count) {
err = errors.WithStack(errTSOLength)
return
}

physical, logical, suffixBits = resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits()
return
}

// GetURLs returns the URLs of the servers.
// For testing use. It should only be called when the client is closed.
func (c *tsoBaseClient) GetURLs() []string {
return make([]string, 0)
}

// GetTSOAllocatorLeaderURLs returns the urls of the tso allocator leaders
// For testing use.
func (c *tsoBaseClient) GetTSOAllocatorLeaderURLs() map[string]string {
return make(map[string]string)
}
Loading

0 comments on commit dfaf5b4

Please sign in to comment.