diff --git a/client/base_client.go b/client/base_client.go index aa48c7f5a9f..b57ef105f1a 100644 --- a/client/base_client.go +++ b/client/base_client.go @@ -39,63 +39,120 @@ const ( memberUpdateInterval = time.Minute ) -// baseClient is a basic client for all other complex client. -type baseClient struct { - urls atomic.Value // Store as []string - clusterID uint64 +type tsoRequest struct { + start time.Time + clientCtx context.Context + requestCtx context.Context + done chan error + physical int64 + logical int64 + dcLocation string + keyspaceID uint32 +} + +// BaseClient defines the general interface for service discovery on a quorum-based cluster +// or a primary/secondy configured cluster. +type BaseClient interface { + // Init initialize the concrete client underlying + Init() error + // Close all grpc client connnections + CloseClientConns() + // GetClusterID returns the ID of the cluster + GetClusterID(context.Context) uint64 + // GetURLs returns the URLs of the servers. + GetURLs() []string + // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map + GetTSOAllocators() *sync.Map + // GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation + GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) + // GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection + // of the given dcLocation + GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) + // GetServingEndpointClientConn returns the grpc client connection of the serving endpoint + // which is the leader in a quorum-based cluster or the primary in a primary/secondy + // configured cluster. + GetServingEndpointClientConn() *grpc.ClientConn + // GetServingAddr returns the serving endpoint which is the leader in a quorum-based cluster + // or the primary in a primary/secondy configured cluster. + GetServingAddr() string + // GetBackupAddrs gets the addresses of the current reachable and healthy backup service + // endpoints randomly. Backup service endpoints are followers in a quorum-based cluster or + // secondaries in a primary/secondary configured cluster. + GetBackupAddrs() []string + // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr + GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) + // ScheduleCheckMemberChanged is used to trigger a check to see if there is any membership change + // among the leader/followers in a quorum-based cluster or among the primary/secondaries in a + // primary/secondy configured cluster. + ScheduleCheckMemberChanged() + // CheckMemberChanged immediately check if there is any membership change among the leader/followers + // in a quorum-based cluster or among the primary/secondaries in a primary/secondy configured cluster. + CheckMemberChanged() error + // AddServingAddrSwitchedCallback adds callbacks which will be called when the leader + // in a quorum-based cluster or the primary in a primary/secondary configured cluster + // is switched. + AddServingAddrSwitchedCallback(callbacks ...func()) + // AddServiceAddrsSwitchedCallback adds callbacks which will be called when any leader/follower + // in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster + // is changed. + AddServiceAddrsSwitchedCallback(callbacks ...func()) + // AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called + // when any global/local tso allocator service endpoint is switched. + AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) +} + +var _ BaseClient = (*pdBaseClient)(nil) + +// pdBaseClient is the service discovery client of PD/API service which is quorum based +type pdBaseClient struct { + urls atomic.Value // Store as []string // PD leader URL leader atomic.Value // Store as string // PD follower URLs followers atomic.Value // Store as []string - // addr -> TSO gRPC connection + + clusterID uint64 + // addr -> a gRPC connection clientConns sync.Map // Store as map[string]*grpc.ClientConn // dc-location -> TSO allocator leader URL - allocators sync.Map // Store as map[string]string + tsoAllocators sync.Map // Store as map[string]string - checkLeaderCh chan struct{} - checkTSODispatcherCh chan struct{} - updateConnectionCtxsCh chan struct{} - updateTokenConnectionCh chan struct{} + // leaderSwitchedCallbacks will be called after the leader swichted + leaderSwitchedCallbacks []func() + // membersChangedCallbacks will be called after there is any membership + // change in the leader and followers + membersChangedCallbacks []func() + // tsoAllocatorLeaderSwitchedCallback will be called when any global/local + // tso allocator leader is switched. + tsoAllocatorLeaderSwitchedCallback []func() - wg sync.WaitGroup + checkMembershipCh chan struct{} + + wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc - security SecurityOption - + tlsCfg *tlsutil.TLSConfig // Client option. option *option } -// SecurityOption records options about tls -type SecurityOption struct { - CAPath string - CertPath string - KeyPath string - - SSLCABytes []byte - SSLCertBytes []byte - SSLKEYBytes []byte -} - -// newBaseClient returns a new baseClient. -func newBaseClient(ctx context.Context, urls []string, security SecurityOption) *baseClient { - clientCtx, clientCancel := context.WithCancel(ctx) - bc := &baseClient{ - checkLeaderCh: make(chan struct{}, 1), - checkTSODispatcherCh: make(chan struct{}, 1), - updateConnectionCtxsCh: make(chan struct{}, 1), - updateTokenConnectionCh: make(chan struct{}, 1), - ctx: clientCtx, - cancel: clientCancel, - security: security, - option: newOption(), +// newPDBaseClient returns a new baseClient. +func newPDBaseClient(ctx context.Context, cancel context.CancelFunc, + wg *sync.WaitGroup, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) BaseClient { + bc := &pdBaseClient{ + checkMembershipCh: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + wg: wg, + tlsCfg: tlsCfg, + option: option, } bc.urls.Store(urls) return bc } -func (c *baseClient) init() error { +func (c *pdBaseClient) Init() error { if err := c.initRetry(c.initClusterID); err != nil { c.cancel() return err @@ -111,7 +168,7 @@ func (c *baseClient) init() error { return nil } -func (c *baseClient) initRetry(f func() error) error { +func (c *pdBaseClient) initRetry(f func() error) error { var err error for i := 0; i < c.option.maxRetryTimes; i++ { if err = f(); err == nil { @@ -126,7 +183,7 @@ func (c *baseClient) initRetry(f func() error) error { return errors.WithStack(err) } -func (c *baseClient) memberLoop() { +func (c *pdBaseClient) memberLoop() { defer c.wg.Done() ctx, cancel := context.WithCancel(c.ctx) @@ -134,7 +191,7 @@ func (c *baseClient) memberLoop() { for { select { - case <-c.checkLeaderCh: + case <-c.checkMembershipCh: case <-time.After(memberUpdateInterval): case <-ctx.Done(): return @@ -148,42 +205,88 @@ func (c *baseClient) memberLoop() { } } -// ScheduleCheckLeader is used to check leader. -func (c *baseClient) ScheduleCheckLeader() { - select { - case c.checkLeaderCh <- struct{}{}: - default: - } +// Close all grpc client connnections +func (c *pdBaseClient) CloseClientConns() { + c.clientConns.Range(func(_, cc interface{}) bool { + if err := cc.(*grpc.ClientConn).Close(); err != nil { + log.Error("[pd] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) + } + return true + }) } -func (c *baseClient) scheduleCheckTSODispatcher() { - select { - case c.checkTSODispatcherCh <- struct{}{}: - default: - } +// GetClusterID returns the ClusterID. +func (c *pdBaseClient) GetClusterID(context.Context) uint64 { + return c.clusterID } -func (c *baseClient) scheduleUpdateConnectionCtxs() { - select { - case c.updateConnectionCtxsCh <- struct{}{}: - default: +// GetURLs returns the URLs of the servers. +// For testing use. It should only be called when the client is closed. +func (c *pdBaseClient) GetURLs() []string { + return c.urls.Load().([]string) +} + +// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map +func (c *pdBaseClient) GetTSOAllocators() *sync.Map { + return &c.tsoAllocators +} + +// GetServingAddr returns the grpc client connection of the serving endpoint +// which is the leader in a quorum-based cluster or the primary in a primary/secondy +// configured cluster. +func (c *pdBaseClient) GetServingEndpointClientConn() *grpc.ClientConn { + if cc, ok := c.clientConns.Load(c.getLeaderAddr()); ok { + return cc.(*grpc.ClientConn) } + return nil +} + +// GetServingAddr returns the leader address +func (c *pdBaseClient) GetServingAddr() string { + return c.getLeaderAddr() +} + +// GetBackupAddrs gets the addresses of the current reachable and healthy followers +// in a quorum-based cluster. +func (c *pdBaseClient) GetBackupAddrs() []string { + return c.getFollowerAddrs() } -func (c *baseClient) scheduleUpdateTokenConnection() { +// ScheduleCheckMemberChanged is used to check if there is any membership +// change among the leader and the followers. +func (c *pdBaseClient) ScheduleCheckMemberChanged() { select { - case c.updateTokenConnectionCh <- struct{}{}: + case c.checkMembershipCh <- struct{}{}: default: } } -// GetClusterID returns the ClusterID. -func (c *baseClient) GetClusterID(context.Context) uint64 { - return c.clusterID +// Immediately check if there is any membership change among the leader/followers in a +// quorum-based cluster or among the primary/secondaries in a primary/secondy configured cluster. +func (c *pdBaseClient) CheckMemberChanged() error { + return c.updateMember() +} + +// AddServingAddrSwitchedCallback adds callbacks which will be called +// when the leader is switched. +func (c *pdBaseClient) AddServingAddrSwitchedCallback(callbacks ...func()) { + c.leaderSwitchedCallbacks = append(c.leaderSwitchedCallbacks, callbacks...) +} + +// AddServiceAddrsSwitchedCallback adds callbacks which will be called when +// any leader/follower is changed. +func (c *pdBaseClient) AddServiceAddrsSwitchedCallback(callbacks ...func()) { + c.membersChangedCallbacks = append(c.membersChangedCallbacks, callbacks...) +} + +// AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called +// when any global/local tso allocator leader is switched. +func (c *pdBaseClient) AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) { + c.tsoAllocatorLeaderSwitchedCallback = append(c.tsoAllocatorLeaderSwitchedCallback, callbacks...) } -// GetLeaderAddr returns the leader address. -func (c *baseClient) GetLeaderAddr() string { +// getLeaderAddr returns the leader address. +func (c *pdBaseClient) getLeaderAddr() string { leaderAddr := c.leader.Load() if leaderAddr == nil { return "" @@ -191,8 +294,8 @@ func (c *baseClient) GetLeaderAddr() string { return leaderAddr.(string) } -// GetFollowerAddrs returns the follower address. -func (c *baseClient) GetFollowerAddrs() []string { +// getFollowerAddrs returns the follower address. +func (c *pdBaseClient) getFollowerAddrs() []string { followerAddrs := c.followers.Load() if followerAddrs == nil { return []string{} @@ -200,31 +303,18 @@ func (c *baseClient) GetFollowerAddrs() []string { return followerAddrs.([]string) } -// GetURLs returns the URLs. -// For testing use. It should only be called when the client is closed. -func (c *baseClient) GetURLs() []string { - return c.urls.Load().([]string) -} - -func (c *baseClient) GetAllocatorLeaderURLs() map[string]string { - allocatorLeader := make(map[string]string) - c.allocators.Range(func(dcLocation, url interface{}) bool { - allocatorLeader[dcLocation.(string)] = url.(string) - return true - }) - return allocatorLeader -} - -func (c *baseClient) getAllocatorLeaderAddrByDCLocation(dcLocation string) (string, bool) { - url, exist := c.allocators.Load(dcLocation) +// GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation +func (c *pdBaseClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) { + url, exist := c.tsoAllocators.Load(dcLocation) if !exist { return "", false } return url.(string), true } -func (c *baseClient) getAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { - url, ok := c.allocators.Load(dcLocation) +// GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection of the given dcLocation +func (c *pdBaseClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { + url, ok := c.tsoAllocators.Load(dcLocation) if !ok { panic(fmt.Sprintf("the allocator leader in %s should exist", dcLocation)) } @@ -235,9 +325,9 @@ func (c *baseClient) getAllocatorClientConnByDCLocation(dcLocation string) (*grp return cc.(*grpc.ClientConn), url.(string) } -func (c *baseClient) gcAllocatorLeaderAddr(curAllocatorMap map[string]*pdpb.Member) { +func (c *pdBaseClient) gcAllocatorLeaderAddr(curAllocatorMap map[string]*pdpb.Member) { // Clean up the old TSO allocators - c.allocators.Range(func(dcLocationKey, _ interface{}) bool { + c.tsoAllocators.Range(func(dcLocationKey, _ interface{}) bool { dcLocation := dcLocationKey.(string) // Skip the Global TSO Allocator if dcLocation == globalDCLocation { @@ -245,13 +335,13 @@ func (c *baseClient) gcAllocatorLeaderAddr(curAllocatorMap map[string]*pdpb.Memb } if _, exist := curAllocatorMap[dcLocation]; !exist { log.Info("[pd] delete unused tso allocator", zap.String("dc-location", dcLocation)) - c.allocators.Delete(dcLocation) + c.tsoAllocators.Delete(dcLocation) } return true }) } -func (c *baseClient) initClusterID() error { +func (c *pdBaseClient) initClusterID() error { ctx, cancel := context.WithCancel(c.ctx) defer cancel() var clusterID uint64 @@ -281,7 +371,7 @@ func (c *baseClient) initClusterID() error { return nil } -func (c *baseClient) updateMember() error { +func (c *pdBaseClient) updateMember() error { for i, u := range c.GetURLs() { failpoint.Inject("skipFirstUpdateMember", func() { if i == 0 { @@ -321,7 +411,10 @@ func (c *baseClient) updateMember() error { if err := c.switchLeader(members.GetLeader().GetClientUrls()); err != nil { return err } - c.scheduleCheckTSODispatcher() + // Run callbacks to refelect any change in the local/global tso allocator. + for _, cb := range c.tsoAllocatorLeaderSwitchedCallback { + cb() + } // If `switchLeader` succeeds but `switchTSOAllocatorLeader` has an error, // the error of `switchTSOAllocatorLeader` will be returned. @@ -330,10 +423,10 @@ func (c *baseClient) updateMember() error { return errs.ErrClientGetMember.FastGenByArgs(c.GetURLs()) } -func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) { +func (c *pdBaseClient) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - cc, err := c.getOrCreateGRPCConn(url) + cc, err := c.GetOrCreateGRPCConn(url) if err != nil { return nil, err } @@ -349,7 +442,7 @@ func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Du return members, nil } -func (c *baseClient) updateURLs(members []*pdpb.Member) { +func (c *pdBaseClient) updateURLs(members []*pdpb.Member) { urls := make([]string, 0, len(members)) for _, m := range members { urls = append(urls, m.GetClientUrls()...) @@ -364,32 +457,38 @@ func (c *baseClient) updateURLs(members []*pdpb.Member) { c.urls.Store(urls) // Update the connection contexts when member changes if TSO Follower Proxy is enabled. if c.option.getEnableTSOFollowerProxy() { - c.scheduleUpdateConnectionCtxs() + // Run callbacks to refelect the membership changes in the leader and followers. + for _, cb := range c.membersChangedCallbacks { + cb() + } } log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) } -func (c *baseClient) switchLeader(addrs []string) error { +func (c *pdBaseClient) switchLeader(addrs []string) error { // FIXME: How to safely compare leader urls? For now, only allows one client url. addr := addrs[0] - oldLeader := c.GetLeaderAddr() + oldLeader := c.getLeaderAddr() if addr == oldLeader { return nil } - if _, err := c.getOrCreateGRPCConn(addr); err != nil { + if _, err := c.GetOrCreateGRPCConn(addr); err != nil { log.Warn("[pd] failed to connect leader", zap.String("leader", addr), errs.ZapError(err)) return err } // Set PD leader and Global TSO Allocator (which is also the PD leader) c.leader.Store(addr) - c.allocators.Store(globalDCLocation, addr) - c.scheduleUpdateTokenConnection() + c.tsoAllocators.Store(globalDCLocation, addr) + // Run callbacks + for _, cb := range c.leaderSwitchedCallbacks { + cb() + } log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader)) return nil } -func (c *baseClient) updateFollowers(members []*pdpb.Member, leader *pdpb.Member) { +func (c *pdBaseClient) updateFollowers(members []*pdpb.Member, leader *pdpb.Member) { var addrs []string for _, member := range members { if member.GetMemberId() != leader.GetMemberId() { @@ -401,7 +500,7 @@ func (c *baseClient) updateFollowers(members []*pdpb.Member, leader *pdpb.Member c.followers.Store(addrs) } -func (c *baseClient) switchTSOAllocatorLeader(allocatorMap map[string]*pdpb.Member) error { +func (c *pdBaseClient) switchTSOAllocatorLeader(allocatorMap map[string]*pdpb.Member) error { if len(allocatorMap) == 0 { return nil } @@ -411,18 +510,18 @@ func (c *baseClient) switchTSOAllocatorLeader(allocatorMap map[string]*pdpb.Memb continue } addr := member.GetClientUrls()[0] - oldAddr, exist := c.getAllocatorLeaderAddrByDCLocation(dcLocation) + oldAddr, exist := c.GetTSOAllocatorServingAddrByDCLocation(dcLocation) if exist && addr == oldAddr { continue } - if _, err := c.getOrCreateGRPCConn(addr); err != nil { + if _, err := c.GetOrCreateGRPCConn(addr); err != nil { log.Warn("[pd] failed to connect dc tso allocator leader", zap.String("dc-location", dcLocation), zap.String("leader", addr), errs.ZapError(err)) return err } - c.allocators.Store(dcLocation, addr) + c.tsoAllocators.Store(dcLocation, addr) log.Info("[pd] switch dc tso allocator leader", zap.String("dc-location", dcLocation), zap.String("new-leader", addr), @@ -433,34 +532,7 @@ func (c *baseClient) switchTSOAllocatorLeader(allocatorMap map[string]*pdpb.Memb return nil } -func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { - conn, ok := c.clientConns.Load(addr) - if ok { - return conn.(*grpc.ClientConn), nil - } - tlsCfg, err := tlsutil.TLSConfig{ - CAPath: c.security.CAPath, - CertPath: c.security.CertPath, - KeyPath: c.security.KeyPath, - - SSLCABytes: c.security.SSLCABytes, - SSLCertBytes: c.security.SSLCertBytes, - SSLKEYBytes: c.security.SSLKEYBytes, - }.ToTLSConfig() - if err != nil { - return nil, err - } - dCtx, cancel := context.WithTimeout(c.ctx, dialTimeout) - defer cancel() - cc, err := grpcutil.GetClientConn(dCtx, addr, tlsCfg, c.option.gRPCDialOptions...) - if err != nil { - return nil, err - } - if old, ok := c.clientConns.Load(addr); ok { - cc.Close() - log.Debug("use old connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String())) - return old.(*grpc.ClientConn), nil - } - c.clientConns.Store(addr, cc) - return cc, nil +// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr +func (c *pdBaseClient) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) } diff --git a/client/client.go b/client/client.go index ea020ea6937..d6ed402f34a 100644 --- a/client/client.go +++ b/client/client.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/tlsutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -191,153 +192,6 @@ func WithBuckets() GetRegionOption { return func(op *GetRegionOp) { op.needBuckets = true } } -type tsoRequest struct { - start time.Time - clientCtx context.Context - requestCtx context.Context - done chan error - physical int64 - logical int64 - dcLocation string -} - -type tsoBatchController struct { - maxBatchSize int - // bestBatchSize is a dynamic size that changed based on the current batch effect. - bestBatchSize int - - tsoRequestCh chan *tsoRequest - collectedRequests []*tsoRequest - collectedRequestCount int - - batchStartTime time.Time -} - -func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tsoBatchController { - return &tsoBatchController{ - maxBatchSize: maxBatchSize, - bestBatchSize: 8, /* Starting from a low value is necessary because we need to make sure it will be converged to (current_batch_size - 4) */ - tsoRequestCh: tsoRequestCh, - collectedRequests: make([]*tsoRequest, maxBatchSize+1), - collectedRequestCount: 0, - } -} - -// fetchPendingRequests will start a new round of the batch collecting from the channel. -// It returns true if everything goes well, otherwise false which means we should stop the service. -func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration) error { - var firstTSORequest *tsoRequest - select { - case <-ctx.Done(): - return ctx.Err() - case firstTSORequest = <-tbc.tsoRequestCh: - } - // Start to batch when the first TSO request arrives. - tbc.batchStartTime = time.Now() - tbc.collectedRequestCount = 0 - tbc.pushRequest(firstTSORequest) - - // This loop is for trying best to collect more requests, so we use `tbc.maxBatchSize` here. -fetchPendingRequestsLoop: - for tbc.collectedRequestCount < tbc.maxBatchSize { - select { - case tsoReq := <-tbc.tsoRequestCh: - tbc.pushRequest(tsoReq) - case <-ctx.Done(): - return ctx.Err() - default: - break fetchPendingRequestsLoop - } - } - - // Check whether we should fetch more pending TSO requests from the channel. - // TODO: maybe consider the actual load that returns through a TSO response from PD server. - if tbc.collectedRequestCount >= tbc.maxBatchSize || maxBatchWaitInterval <= 0 { - return nil - } - - // Fetches more pending TSO requests from the channel. - // Try to collect `tbc.bestBatchSize` requests, or wait `maxBatchWaitInterval` - // when `tbc.collectedRequestCount` is less than the `tbc.bestBatchSize`. - if tbc.collectedRequestCount < tbc.bestBatchSize { - after := time.NewTimer(maxBatchWaitInterval) - defer after.Stop() - for tbc.collectedRequestCount < tbc.bestBatchSize { - select { - case tsoReq := <-tbc.tsoRequestCh: - tbc.pushRequest(tsoReq) - case <-ctx.Done(): - return ctx.Err() - case <-after.C: - return nil - } - } - } - - // Do an additional non-block try. Here we test the length with `tbc.maxBatchSize` instead - // of `tbc.bestBatchSize` because trying best to fetch more requests is necessary so that - // we can adjust the `tbc.bestBatchSize` dynamically later. - for tbc.collectedRequestCount < tbc.maxBatchSize { - select { - case tsoReq := <-tbc.tsoRequestCh: - tbc.pushRequest(tsoReq) - case <-ctx.Done(): - return ctx.Err() - default: - return nil - } - } - return nil -} - -func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) { - tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq - tbc.collectedRequestCount++ -} - -func (tbc *tsoBatchController) getCollectedRequests() []*tsoRequest { - return tbc.collectedRequests[:tbc.collectedRequestCount] -} - -// adjustBestBatchSize stabilizes the latency with the AIAD algorithm. -func (tbc *tsoBatchController) adjustBestBatchSize() { - tsoBestBatchSize.Observe(float64(tbc.bestBatchSize)) - length := tbc.collectedRequestCount - if length < tbc.bestBatchSize && tbc.bestBatchSize > 1 { - // Waits too long to collect requests, reduce the target batch size. - tbc.bestBatchSize-- - } else if length > tbc.bestBatchSize+4 /* Hard-coded number, in order to make `tbc.bestBatchSize` stable */ && - tbc.bestBatchSize < tbc.maxBatchSize { - tbc.bestBatchSize++ - } -} - -func (tbc *tsoBatchController) revokePendingTSORequest(err error) { - for i := 0; i < len(tbc.tsoRequestCh); i++ { - req := <-tbc.tsoRequestCh - req.done <- err - } -} - -type tsoDispatcher struct { - dispatcherCancel context.CancelFunc - tsoBatchController *tsoBatchController -} - -type lastTSO struct { - physical int64 - logical int64 -} - -const ( - dialTimeout = 3 * time.Second - updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. - tsLoopDCCheckInterval = time.Minute - defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst - retryInterval = 500 * time.Millisecond - maxRetryTimes = 6 -) - // LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. var LeaderHealthCheckInterval = time.Second @@ -383,8 +237,11 @@ func WithMaxErrorRetry(count int) ClientOption { } } +var _ Client = (*client)(nil) + type client struct { - *baseClient + bc BaseClient + tsoStreamBuilderFactory // tsoDispatcher is used to dispatch different TSO requests to // the corresponding dc-location TSO channel. tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest @@ -396,39 +253,131 @@ type client struct { tokenDispatcher *tokenDispatcher // For internal usage. - checkTSDeadlineCh chan struct{} - leaderNetworkFailure int32 + checkTSDeadlineCh chan struct{} + checkTSODispatcherCh chan struct{} + updateTSOConnectionCtxsCh chan struct{} + updateTokenConnectionCh chan struct{} + leaderNetworkFailure int32 + wg sync.WaitGroup + + ctx context.Context + cancel context.CancelFunc + + option *option +} + +// SecurityOption records options about tls +type SecurityOption struct { + CAPath string + CertPath string + KeyPath string + + SSLCABytes []byte + SSLCertBytes []byte + SSLKEYBytes []byte } // NewClient creates a PD client. -func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { - return NewClientWithContext(context.Background(), pdAddrs, security, opts...) +func NewClient(svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { + return NewClientWithContext(context.Background(), svrAddrs, security, opts...) } // NewClientWithContext creates a PD client with context. -func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { - log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", pdAddrs)) +func NewClientWithContext(ctx context.Context, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { + log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", svrAddrs)) + c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security) + c.tsoStreamBuilderFactory = &pdTSOStreamBuilderFactory{} + c.bc = newPDBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option) + if err := c.setup(opts...); err != nil { + return nil, err + } + return c, nil +} + +// NewTSOClientWithContext creates a TSO client with context. +func NewTSOClientWithContext(ctx context.Context, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { + log.Info("[pd(tso)] create tso client with endpoints", zap.Strings("tso-address", svrAddrs)) + c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security) + c.tsoStreamBuilderFactory = &tsoTSOStreamBuilderFactory{} + c.bc = newTSOBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option) + if err := c.setup(opts...); err != nil { + return nil, err + } + return c, nil +} + +func createClient(ctx context.Context, security *SecurityOption) (*client, context.Context, context.CancelFunc, *tlsutil.TLSConfig) { + tlsCfg := &tlsutil.TLSConfig{ + CAPath: security.CAPath, + CertPath: security.CertPath, + KeyPath: security.KeyPath, + + SSLCABytes: security.SSLCABytes, + SSLCertBytes: security.SSLCertBytes, + SSLKEYBytes: security.SSLKEYBytes, + } + + clientCtx, clientCancel := context.WithCancel(ctx) c := &client{ - baseClient: newBaseClient(ctx, addrsToUrls(pdAddrs), security), - checkTSDeadlineCh: make(chan struct{}), + checkTSDeadlineCh: make(chan struct{}), + checkTSODispatcherCh: make(chan struct{}, 1), + updateTSOConnectionCtxsCh: make(chan struct{}, 1), + updateTokenConnectionCh: make(chan struct{}, 1), + ctx: clientCtx, + cancel: clientCancel, + option: newOption(), } + + return c, clientCtx, clientCancel, tlsCfg +} + +func (c *client) setup(opts ...ClientOption) error { // Inject the client options. for _, opt := range opts { opt(c) } // Init the client base. - if err := c.init(); err != nil { - return nil, err + if err := c.bc.Init(); err != nil { + return err } - // Start the daemons. + + // Register callbacks + c.bc.AddTSOAllocatorServingAddrSwitchedCallback(c.scheduleCheckTSODispatcher) + c.bc.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) + c.bc.AddServingAddrSwitchedCallback(c.scheduleUpdateTokenConnection) + + // Create dispatchers c.updateTSODispatcher() c.createTokenDispatcher() + + // Start the daemons. c.wg.Add(3) go c.tsLoop() go c.tsCancelLoop() go c.leaderCheckLoop() + return nil +} - return c, nil +func (c *client) scheduleUpdateTokenConnection() { + select { + case c.updateTokenConnectionCh <- struct{}{}: + default: + } +} + +// GetClusterID returns the ClusterID. +func (c *client) GetClusterID(ctx context.Context) uint64 { + return c.bc.GetClusterID(ctx) +} + +// GetLeaderAddr returns the leader address. +func (c *client) GetLeaderAddr() string { + return c.bc.GetServingAddr() +} + +// GetBaseClient returns BaseClient which contains service discovery client logic +func (c *client) GetBaseClient() BaseClient { + return c.bc } // UpdateOption updates the client option. @@ -454,31 +403,6 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error { return nil } -func (c *client) updateTSODispatcher() { - // Set up the new TSO dispatcher and batch controller. - c.allocators.Range(func(dcLocationKey, _ interface{}) bool { - dcLocation := dcLocationKey.(string) - if !c.checkTSODispatcher(dcLocation) { - c.createTSODispatcher(dcLocation) - } - return true - }) - // Clean up the unused TSO dispatcher - c.tsoDispatcher.Range(func(dcLocationKey, _ interface{}) bool { - dcLocation := dcLocationKey.(string) - // Skip the Global TSO Allocator - if dcLocation == globalDCLocation { - return true - } - if dispatcher, exist := c.allocators.Load(dcLocation); !exist { - log.Info("[pd] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) - dispatcher.(*tsoDispatcher).dispatcherCancel() - c.tsoDispatcher.Delete(dcLocation) - } - return true - }) -} - func (c *client) leaderCheckLoop() { defer c.wg.Done() @@ -501,8 +425,8 @@ func (c *client) leaderCheckLoop() { func (c *client) checkLeaderHealth(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() - if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok { - healthCli := healthpb.NewHealthClient(cc.(*grpc.ClientConn)) + if client := c.bc.GetServingEndpointClientConn(); client != nil { + healthCli := healthpb.NewHealthClient(client) resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) rpcErr, ok := status.FromError(err) failpoint.Inject("unreachableNetwork1", func() { @@ -517,80 +441,6 @@ func (c *client) checkLeaderHealth(ctx context.Context) { } } -type deadline struct { - timer <-chan time.Time - done chan struct{} - cancel context.CancelFunc -} - -func (c *client) tsCancelLoop() { - defer c.wg.Done() - - tsCancelLoopCtx, tsCancelLoopCancel := context.WithCancel(c.ctx) - defer tsCancelLoopCancel() - - ticker := time.NewTicker(tsLoopDCCheckInterval) - defer ticker.Stop() - for { - // Watch every dc-location's tsDeadlineCh - c.allocators.Range(func(dcLocation, _ interface{}) bool { - c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string)) - return true - }) - select { - case <-c.checkTSDeadlineCh: - continue - case <-ticker.C: - continue - case <-tsCancelLoopCtx.Done(): - return - } - } -} - -func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) { - if _, exist := c.tsDeadline.Load(dcLocation); !exist { - tsDeadlineCh := make(chan deadline, 1) - c.tsDeadline.Store(dcLocation, tsDeadlineCh) - go func(dc string, tsDeadlineCh <-chan deadline) { - for { - select { - case d := <-tsDeadlineCh: - select { - case <-d.timer: - log.Error("[pd] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) - d.cancel() - case <-d.done: - continue - case <-ctx.Done(): - return - } - case <-ctx.Done(): - return - } - } - }(dcLocation, tsDeadlineCh) - } -} - -func (c *client) scheduleCheckTSDeadline() { - select { - case c.checkTSDeadlineCh <- struct{}{}: - default: - } -} - -func (c *client) checkStreamTimeout(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { - select { - case <-done: - return - case <-time.After(c.option.timeout): - cancel() - case <-streamCtx.Done(): - } - <-done -} - func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { start := time.Now() defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() @@ -606,567 +456,6 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { return resp.GetMembers(), nil } -func (c *client) tsLoop() { - defer c.wg.Done() - - loopCtx, loopCancel := context.WithCancel(c.ctx) - defer loopCancel() - - ticker := time.NewTicker(tsLoopDCCheckInterval) - defer ticker.Stop() - for { - c.updateTSODispatcher() - select { - case <-ticker.C: - case <-c.checkTSODispatcherCh: - case <-loopCtx.Done(): - return - } - } -} - -func (c *client) createTsoStream(ctx context.Context, cancel context.CancelFunc, client pdpb.PDClient) (pdpb.PD_TsoClient, error) { - done := make(chan struct{}) - // TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created. - go c.checkStreamTimeout(ctx, cancel, done) - stream, err := client.Tso(ctx) - done <- struct{}{} - return stream, err -} - -func (c *client) checkAllocator( - dispatcherCtx context.Context, - forwardCancel context.CancelFunc, - dc, forwardedHostTrim, addrTrim, url string, - updateAndClear func(newAddr string, connectionCtx *connectionContext)) { - defer func() { - // cancel the forward stream - forwardCancel() - requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(0) - }() - cc, u := c.getAllocatorClientConnByDCLocation(dc) - healthCli := healthpb.NewHealthClient(cc) - for { - // the pd/allocator leader change, we need to re-establish the stream - if u != url { - log.Info("[pd] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) - return - } - healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) - resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) - failpoint.Inject("unreachableNetwork", func() { - resp.Status = healthpb.HealthCheckResponse_UNKNOWN - }) - healthCancel() - if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { - // create a stream of the original allocator - cctx, cancel := context.WithCancel(dispatcherCtx) - stream, err := c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc)) - if err == nil && stream != nil { - log.Info("[pd] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) - updateAndClear(url, &connectionContext{url, stream, cctx, cancel}) - return - } - } - select { - case <-dispatcherCtx.Done(): - return - case <-time.After(time.Second): - // To ensure we can get the latest allocator leader - // and once the leader is changed, we can exit this function. - _, u = c.getAllocatorClientConnByDCLocation(dc) - } - } -} - -func (c *client) checkTSODispatcher(dcLocation string) bool { - dispatcher, ok := c.tsoDispatcher.Load(dcLocation) - if !ok || dispatcher == nil { - return false - } - return true -} - -func (c *client) createTSODispatcher(dcLocation string) { - dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) - dispatcher := &tsoDispatcher{ - dispatcherCancel: dispatcherCancel, - tsoBatchController: newTSOBatchController( - make(chan *tsoRequest, defaultMaxTSOBatchSize*2), - defaultMaxTSOBatchSize), - } - // Each goroutine is responsible for handling the tso stream request for its dc-location. - // The only case that will make the dispatcher goroutine exit - // is that the loopCtx is done, otherwise there is no circumstance - // this goroutine should exit. - go c.handleDispatcher(dispatcherCtx, dcLocation, dispatcher.tsoBatchController) - c.tsoDispatcher.Store(dcLocation, dispatcher) - log.Info("[pd] tso dispatcher created", zap.String("dc-location", dcLocation)) -} - -func (c *client) handleDispatcher( - dispatcherCtx context.Context, - dc string, - tbc *tsoBatchController) { - var ( - err error - streamAddr string - stream pdpb.PD_TsoClient - streamCtx context.Context - cancel context.CancelFunc - // addr -> connectionContext - connectionCtxs sync.Map - opts []opentracing.StartSpanOption - ) - defer func() { - log.Info("[pd] exit tso dispatcher", zap.String("dc-location", dc)) - // Cancel all connections. - connectionCtxs.Range(func(_, cc interface{}) bool { - cc.(*connectionContext).cancel() - return true - }) - }() - // Call updateConnectionCtxs once to init the connectionCtxs first. - c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - // Only the Global TSO needs to watch the updateConnectionCtxsCh to sense the - // change of the cluster when TSO Follower Proxy is enabled. - // TODO: support TSO Follower Proxy for the Local TSO. - if dc == globalDCLocation { - go func() { - var updateTicker = &time.Ticker{} - setNewUpdateTicker := func(ticker *time.Ticker) { - if updateTicker.C != nil { - updateTicker.Stop() - } - updateTicker = ticker - } - // Set to nil before returning to ensure that the existing ticker can be GC. - defer setNewUpdateTicker(nil) - - for { - select { - case <-dispatcherCtx.Done(): - return - case <-c.option.enableTSOFollowerProxyCh: - enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() - if enableTSOFollowerProxy && updateTicker.C == nil { - // Because the TSO Follower Proxy is enabled, - // the periodic check needs to be performed. - setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) - } else if !enableTSOFollowerProxy && updateTicker.C != nil { - // Because the TSO Follower Proxy is disabled, - // the periodic check needs to be turned off. - setNewUpdateTicker(&time.Ticker{}) - } else { - // The status of TSO Follower Proxy does not change, and updateConnectionCtxs is not triggered - continue - } - case <-updateTicker.C: - case <-c.updateConnectionCtxsCh: - } - c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - } - }() - } - - // Loop through each batch of TSO requests and send them for processing. - streamLoopTimer := time.NewTimer(c.option.timeout) -tsoBatchLoop: - for { - select { - case <-dispatcherCtx.Done(): - return - default: - } - // Start to collect the TSO requests. - maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() - if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { - if err == context.Canceled { - log.Info("[pd] stop fetching the pending tso requests due to context canceled", - zap.String("dc-location", dc)) - } else { - log.Error("[pd] fetch pending tso requests error", - zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSO, err)) - } - return - } - if maxBatchWaitInterval >= 0 { - tbc.adjustBestBatchSize() - } - streamLoopTimer.Reset(c.option.timeout) - // Choose a stream to send the TSO gRPC request. - streamChoosingLoop: - for { - connectionCtx := c.chooseStream(&connectionCtxs) - if connectionCtx != nil { - streamAddr, stream, streamCtx, cancel = connectionCtx.streamAddr, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel - } - // Check stream and retry if necessary. - if stream == nil { - log.Info("[pd] tso stream is not ready", zap.String("dc", dc)) - if c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { - continue streamChoosingLoop - } - select { - case <-dispatcherCtx.Done(): - return - case <-streamLoopTimer.C: - err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) - log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) - c.ScheduleCheckLeader() - c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) - continue tsoBatchLoop - case <-time.After(retryInterval): - continue streamChoosingLoop - } - } - select { - case <-streamCtx.Done(): - log.Info("[pd] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr)) - // Set `stream` to nil and remove this stream from the `connectionCtxs` due to being canceled. - connectionCtxs.Delete(streamAddr) - cancel() - stream = nil - continue - default: - break streamChoosingLoop - } - } - done := make(chan struct{}) - dl := deadline{ - timer: time.After(c.option.timeout), - done: done, - cancel: cancel, - } - tsDeadlineCh, ok := c.tsDeadline.Load(dc) - for !ok || tsDeadlineCh == nil { - c.scheduleCheckTSDeadline() - time.Sleep(time.Millisecond * 100) - tsDeadlineCh, ok = c.tsDeadline.Load(dc) - } - select { - case <-dispatcherCtx.Done(): - return - case tsDeadlineCh.(chan deadline) <- dl: - } - opts = extractSpanReference(tbc, opts[:0]) - err = c.processTSORequests(stream, dc, tbc, opts) - close(done) - // If error happens during tso stream handling, reset stream and run the next trial. - if err != nil { - select { - case <-dispatcherCtx.Done(): - return - default: - } - c.ScheduleCheckLeader() - log.Error("[pd] getTS error", zap.String("dc-location", dc), zap.String("stream-addr", streamAddr), errs.ZapError(errs.ErrClientGetTSO, err)) - // Set `stream` to nil and remove this stream from the `connectionCtxs` due to error. - connectionCtxs.Delete(streamAddr) - cancel() - stream = nil - // Because ScheduleCheckLeader is asynchronous, if the leader changes, we better call `updateMember` ASAP. - if IsLeaderChange(err) { - if err := c.updateMember(); err != nil { - select { - case <-dispatcherCtx.Done(): - return - default: - } - } - // Because the TSO Follower Proxy could be configured online, - // If we change it from on -> off, background updateConnectionCtxs - // will cancel the current stream, then the EOF error caused by cancel() - // should not trigger the updateConnectionCtxs here. - // So we should only call it when the leader changes. - c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - } - } - } -} - -// TSO Follower Proxy only supports the Global TSO proxy now. -func (c *client) allowTSOFollowerProxy(dc string) bool { - return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() -} - -// chooseStream uses the reservoir sampling algorithm to randomly choose a connection. -// connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. -func (c *client) chooseStream(connectionCtxs *sync.Map) (connectionCtx *connectionContext) { - idx := 0 - connectionCtxs.Range(func(addr, cc interface{}) bool { - j := rand.Intn(idx + 1) - if j < 1 { - connectionCtx = cc.(*connectionContext) - } - idx++ - return true - }) - return connectionCtx -} - -type connectionContext struct { - streamAddr string - // Current stream to send gRPC requests, maybe a leader or a follower. - stream pdpb.PD_TsoClient - ctx context.Context - cancel context.CancelFunc -} - -func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { - // Normal connection creating, it will be affected by the `enableForwarding`. - createTSOConnection := c.tryConnect - if c.allowTSOFollowerProxy(dc) { - createTSOConnection = c.tryConnectWithProxy - } - if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { - log.Error("[pd] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) - return false - } - return true -} - -// tryConnect will try to connect to the TSO allocator leader. If the connection becomes unreachable -// and enableForwarding is true, it will create a new connection to a follower to do the forwarding, -// while a new daemon will be created also to switch back to a normal leader connection ASAP the -// connection comes back to normal. -func (c *client) tryConnect( - dispatcherCtx context.Context, - dc string, - connectionCtxs *sync.Map, -) error { - var ( - networkErrNum uint64 - err error - stream pdpb.PD_TsoClient - url string - cc *grpc.ClientConn - ) - updateAndClear := func(newAddr string, connectionCtx *connectionContext) { - if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded { - // If the previous connection still exists, we should close it first. - cc.(*connectionContext).cancel() - connectionCtxs.Store(newAddr, connectionCtx) - } - connectionCtxs.Range(func(addr, cc interface{}) bool { - if addr.(string) != newAddr { - cc.(*connectionContext).cancel() - connectionCtxs.Delete(addr) - } - return true - }) - } - // retry several times before falling back to the follower when the network problem happens - - for i := 0; i < maxRetryTimes; i++ { - c.ScheduleCheckLeader() - cc, url = c.getAllocatorClientConnByDCLocation(dc) - cctx, cancel := context.WithCancel(dispatcherCtx) - stream, err = c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc)) - failpoint.Inject("unreachableNetwork", func() { - stream = nil - err = status.New(codes.Unavailable, "unavailable").Err() - }) - if stream != nil && err == nil { - updateAndClear(url, &connectionContext{url, stream, cctx, cancel}) - return nil - } - - if err != nil && c.option.enableForwarding { - // The reason we need to judge if the error code is equal to "Canceled" here is that - // when we create a stream we use a goroutine to manually control the timeout of the connection. - // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. - // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. - // And actually the `Canceled` error can be regarded as a kind of network error in some way. - if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { - networkErrNum++ - } - } - - cancel() - select { - case <-dispatcherCtx.Done(): - return err - case <-time.After(retryInterval): - } - } - - if networkErrNum == maxRetryTimes { - // encounter the network error - followerClient, addr := c.followerClient() - if followerClient != nil { - log.Info("[pd] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("addr", addr)) - forwardedHost, ok := c.getAllocatorLeaderAddrByDCLocation(dc) - if !ok { - return errors.Errorf("cannot find the allocator leader in %s", dc) - } - - // create the follower stream - cctx, cancel := context.WithCancel(dispatcherCtx) - cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) - stream, err = c.createTsoStream(cctx, cancel, followerClient) - if err == nil { - forwardedHostTrim := trimHTTPPrefix(forwardedHost) - addrTrim := trimHTTPPrefix(addr) - // the goroutine is used to check the network and change back to the original stream - go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addrTrim, url, updateAndClear) - requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) - updateAndClear(addr, &connectionContext{addr, stream, cctx, cancel}) - return nil - } - cancel() - } - } - return err -} - -// tryConnectWithProxy will create multiple streams to all the PD servers to work as a TSO proxy to reduce -// the pressure of PD leader. -func (c *client) tryConnectWithProxy( - dispatcherCtx context.Context, - dc string, - connectionCtxs *sync.Map, -) error { - clients := c.getAllClients() - leaderAddr := c.GetLeaderAddr() - forwardedHost, ok := c.getAllocatorLeaderAddrByDCLocation(dc) - if !ok { - return errors.Errorf("cannot find the allocator leader in %s", dc) - } - // GC the stale one. - connectionCtxs.Range(func(addr, cc interface{}) bool { - if _, ok := clients[addr.(string)]; !ok { - cc.(*connectionContext).cancel() - connectionCtxs.Delete(addr) - } - return true - }) - // Update the missing one. - for addr, client := range clients { - if _, ok = connectionCtxs.Load(addr); ok { - continue - } - cctx, cancel := context.WithCancel(dispatcherCtx) - // Do not proxy the leader client. - if addr != leaderAddr { - log.Info("[pd] use follower to forward tso stream to do the proxy", zap.String("dc", dc), zap.String("addr", addr)) - cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) - } - // Create the TSO stream. - stream, err := c.createTsoStream(cctx, cancel, client) - if err == nil { - if addr != leaderAddr { - forwardedHostTrim := trimHTTPPrefix(forwardedHost) - addrTrim := trimHTTPPrefix(addr) - requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) - } - connectionCtxs.Store(addr, &connectionContext{addr, stream, cctx, cancel}) - continue - } - log.Error("[pd] create the tso stream failed", zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) - cancel() - } - return nil -} - -func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanOption) []opentracing.StartSpanOption { - for _, req := range tbc.getCollectedRequests() { - if span := opentracing.SpanFromContext(req.requestCtx); span != nil { - opts = append(opts, opentracing.ChildOf(span.Context())) - } - } - return opts -} - -func (c *client) processTSORequests(stream pdpb.PD_TsoClient, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption) error { - if len(opts) > 0 { - span := opentracing.StartSpan("pdclient.processTSORequests", opts...) - defer span.Finish() - } - start := time.Now() - requests := tbc.getCollectedRequests() - count := int64(len(requests)) - req := &pdpb.TsoRequest{ - Header: c.requestHeader(), - Count: uint32(count), - DcLocation: dcLocation, - } - - if err := stream.Send(req); err != nil { - err = errors.WithStack(err) - c.finishTSORequest(requests, 0, 0, 0, err) - return err - } - tsoBatchSendLatency.Observe(float64(time.Since(tbc.batchStartTime))) - resp, err := stream.Recv() - if err != nil { - err = errors.WithStack(err) - c.finishTSORequest(requests, 0, 0, 0, err) - return err - } - requestDurationTSO.Observe(time.Since(start).Seconds()) - tsoBatchSize.Observe(float64(count)) - - if resp.GetCount() != uint32(count) { - err = errors.WithStack(errTSOLength) - c.finishTSORequest(requests, 0, 0, 0, err) - return err - } - - physical, logical, suffixBits := resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits() - // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. - firstLogical := addLogical(logical, -count+1, suffixBits) - c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count) - c.finishTSORequest(requests, physical, firstLogical, suffixBits, nil) - return nil -} - -// Because of the suffix, we need to shift the count before we add it to the logical part. -func addLogical(logical, count int64, suffixBits uint32) int64 { - return logical + count<= tbc.maxBatchSize || maxBatchWaitInterval <= 0 { + return nil + } + + // Fetches more pending TSO requests from the channel. + // Try to collect `tbc.bestBatchSize` requests, or wait `maxBatchWaitInterval` + // when `tbc.collectedRequestCount` is less than the `tbc.bestBatchSize`. + if tbc.collectedRequestCount < tbc.bestBatchSize { + after := time.NewTimer(maxBatchWaitInterval) + defer after.Stop() + for tbc.collectedRequestCount < tbc.bestBatchSize { + select { + case tsoReq := <-tbc.tsoRequestCh: + tbc.pushRequest(tsoReq) + case <-ctx.Done(): + return ctx.Err() + case <-after.C: + return nil + } + } + } + + // Do an additional non-block try. Here we test the length with `tbc.maxBatchSize` instead + // of `tbc.bestBatchSize` because trying best to fetch more requests is necessary so that + // we can adjust the `tbc.bestBatchSize` dynamically later. + for tbc.collectedRequestCount < tbc.maxBatchSize { + select { + case tsoReq := <-tbc.tsoRequestCh: + tbc.pushRequest(tsoReq) + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } + return nil +} + +func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) { + tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq + tbc.collectedRequestCount++ +} + +func (tbc *tsoBatchController) getCollectedRequests() []*tsoRequest { + return tbc.collectedRequests[:tbc.collectedRequestCount] +} + +// adjustBestBatchSize stabilizes the latency with the AIAD algorithm. +func (tbc *tsoBatchController) adjustBestBatchSize() { + tsoBestBatchSize.Observe(float64(tbc.bestBatchSize)) + length := tbc.collectedRequestCount + if length < tbc.bestBatchSize && tbc.bestBatchSize > 1 { + // Waits too long to collect requests, reduce the target batch size. + tbc.bestBatchSize-- + } else if length > tbc.bestBatchSize+4 /* Hard-coded number, in order to make `tbc.bestBatchSize` stable */ && + tbc.bestBatchSize < tbc.maxBatchSize { + tbc.bestBatchSize++ + } +} + +func (tbc *tsoBatchController) revokePendingTSORequest(err error) { + for i := 0; i < len(tbc.tsoRequestCh); i++ { + req := <-tbc.tsoRequestCh + req.done <- err + } +} diff --git a/client/tso_client.go b/client/tso_client.go index 72953956bc8..1dbd33856aa 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -14,7 +14,21 @@ package pd -import "context" +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/log" + "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/tlsutil" + "go.uber.org/zap" + "google.golang.org/grpc" +) // TSOClient manages resource group info and token request. type TSOClient interface { @@ -35,7 +49,7 @@ type TSOClient interface { // here is in a basic manner and only for testing and integration purpose -- no batching, // no async, no pooling, no forwarding, no retry and no deliberate error handling. func (c *client) GetTSWithinKeyspace(ctx context.Context, keyspaceID uint32) (physical int64, logical int64, err error) { - resp := c.GetTSAsync(ctx) + resp := c.GetTSWithinKeyspaceAsync(ctx, keyspaceID) return resp.Wait() } @@ -55,5 +69,233 @@ func (c *client) GetTSWithinKeyspaceAsync(ctx context.Context, keyspaceID uint32 // without block the caller. // TODO: implement the following API func (c *client) GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation string, keyspaceID uint32) TSFuture { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("GetLocalTSWithinKeyspaceAsync", opentracing.ChildOf(span.Context())) + ctx = opentracing.ContextWithSpan(ctx, span) + } + req := tsoReqPool.Get().(*tsoRequest) + req.requestCtx = ctx + req.clientCtx = c.ctx + req.start = time.Now() + req.dcLocation = dcLocation + req.keyspaceID = keyspaceID + if err := c.dispatchRequest(dcLocation, req); err != nil { + // Wait for a while and try again + time.Sleep(50 * time.Millisecond) + if err = c.dispatchRequest(dcLocation, req); err != nil { + req.done <- err + } + } + return req +} + +var _ BaseClient = (*tsoBaseClient)(nil) + +// tsoBaseClient is the service discovery client of TSO microservice which is primary/standby configured +type tsoBaseClient struct { + urls atomic.Value // Store as []string + // TSO Primary URL + primary atomic.Value // Store as string + // TSO Secondary URLs + secondaries atomic.Value // Store as []string + + // addr -> a gRPC connection + clientConns sync.Map // Store as map[string]*grpc.ClientConn + // dc-location -> TSO allocator primary URL + tsoAllocators sync.Map // Store as map[string]string + + // primarySwitchedCallbacks will be called after the primary swichted + primarySwitchedCallbacks []func() + // membersChangedCallbacks will be called after there is any membership + // change in the primary and followers + membersChangedCallbacks []func() + // tsoAllocatorLeaderSwitchedCallback will be called when any keyspace group tso + // allocator primary is switched. + tsoAllocatorLeaderSwitchedCallback []func() + + checkMembershipCh chan struct{} + + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + + tlsCfg *tlsutil.TLSConfig + + // Client option. + option *option +} + +// newTSOBaseClient returns a new baseClient. +func newTSOBaseClient(ctx context.Context, cancel context.CancelFunc, + wg *sync.WaitGroup, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) BaseClient { + bc := &tsoBaseClient{ + checkMembershipCh: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + wg: wg, + tlsCfg: tlsCfg, + option: option, + } + bc.urls.Store(urls) + // TODO: fill the missing part for service discovery + bc.switchPrimary(urls) + + _, err := bc.GetOrCreateGRPCConn(bc.getPrimaryAddr()) + if err != nil { + return nil + } + + return bc +} + +// Init initialize the concrete client underlying +func (c *tsoBaseClient) Init() error { + return nil +} + +// Close all grpc client connnections +func (c *tsoBaseClient) CloseClientConns() { + c.clientConns.Range(func(_, cc interface{}) bool { + if err := cc.(*grpc.ClientConn).Close(); err != nil { + log.Error("[pd] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) + } + return true + }) +} + +// GetClusterID returns the ID of the cluster +func (c *tsoBaseClient) GetClusterID(context.Context) uint64 { + return 0 +} + +// GetURLs returns the URLs of the servers. +// For testing use. It should only be called when the client is closed. +func (c *tsoBaseClient) GetURLs() []string { + return c.urls.Load().([]string) +} + +// GetTSOAllocators returns {dc-location -> TSO allocator primary URL} connection map +func (c *tsoBaseClient) GetTSOAllocators() *sync.Map { + return &c.tsoAllocators +} + +// GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation +func (c *tsoBaseClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) { + url, exist := c.tsoAllocators.Load(dcLocation) + if !exist { + return "", false + } + return url.(string), true +} + +// GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection +// of the given dcLocation +func (c *tsoBaseClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { + url, ok := c.tsoAllocators.Load(dcLocation) + if !ok { + panic(fmt.Sprintf("the allocator leader in %s should exist", dcLocation)) + } + cc, ok := c.clientConns.Load(url) + if !ok { + panic(fmt.Sprintf("the client connection of %s in %s should exist", url, dcLocation)) + } + return cc.(*grpc.ClientConn), url.(string) +} + +// GetServingAddr returns the grpc client connection of the serving endpoint +// which is the primary in a primary/secondy configured cluster. +func (c *tsoBaseClient) GetServingEndpointClientConn() *grpc.ClientConn { + if cc, ok := c.clientConns.Load(c.getPrimaryAddr()); ok { + return cc.(*grpc.ClientConn) + } + return nil +} + +// GetServingAddr returns the serving endpoint which is the primary in a +// primary/secondy configured cluster. +func (c *tsoBaseClient) GetServingAddr() string { + return c.getPrimaryAddr() +} + +// GetBackupAddrs gets the addresses of the current reachable and healthy +// backup service endpoints randomly. Backup service endpoints are secondaries in +// a primary/secondary configured cluster. +func (c *tsoBaseClient) GetBackupAddrs() []string { + return c.getSecondaryAddrs() +} + +// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr +func (c *tsoBaseClient) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) +} + +// ScheduleCheckMemberChanged is used to trigger a check to see if there is any +// membership change among the primary/secondaries in a primary/secondy configured cluster. +func (c *tsoBaseClient) ScheduleCheckMemberChanged() { + +} + +// Immediately checkif there is any membership change among the primary/secondaries in +// a primary/secondy configured cluster. +func (c *tsoBaseClient) CheckMemberChanged() error { + return nil +} + +// AddServingAddrSwitchedCallback adds callbacks which will be called when the primary in +// a primary/secondary configured cluster is switched. +func (c *tsoBaseClient) AddServingAddrSwitchedCallback(callbacks ...func()) { + c.primarySwitchedCallbacks = append(c.primarySwitchedCallbacks, callbacks...) +} + +// AddServiceAddrsSwitchedCallback adds callbacks which will be called when any primary/secondary +// in a primary/secondary configured cluster is changed. +func (c *tsoBaseClient) AddServiceAddrsSwitchedCallback(callbacks ...func()) { + c.membersChangedCallbacks = append(c.membersChangedCallbacks, callbacks...) +} + +// AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called +// when any keyspace group tso allocator primary is switched. +func (c *tsoBaseClient) AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) { + c.tsoAllocatorLeaderSwitchedCallback = append(c.tsoAllocatorLeaderSwitchedCallback, callbacks...) +} + +// getPrimaryAddr returns the primary address. +func (c *tsoBaseClient) getPrimaryAddr() string { + primaryAddr := c.primary.Load() + if primaryAddr == nil { + return "" + } + return primaryAddr.(string) +} + +// getSecondaryAddrs returns the secondary addresses. +func (c *tsoBaseClient) getSecondaryAddrs() []string { + secondaryAddrs := c.secondaries.Load() + if secondaryAddrs == nil { + return []string{} + } + return secondaryAddrs.([]string) +} + +func (c *tsoBaseClient) switchPrimary(addrs []string) error { + // FIXME: How to safely compare primary urls? For now, only allows one client url. + addr := addrs[0] + oldPrimary := c.getPrimaryAddr() + if addr == oldPrimary { + return nil + } + + if _, err := c.GetOrCreateGRPCConn(addr); err != nil { + log.Warn("[pd] failed to connect primary", zap.String("primary", addr), errs.ZapError(err)) + return err + } + // Set PD primary and Global TSO Allocator (which is also the PD primary) + c.primary.Store(addr) + c.tsoAllocators.Store(globalDCLocation, addr) + // Run callbacks + for _, cb := range c.primarySwitchedCallbacks { + cb() + } + log.Info("[tso] switch primary", zap.String("new-primary", addr), zap.String("old-primary", oldPrimary)) return nil } diff --git a/client/tso_request_dispatcher.go b/client/tso_request_dispatcher.go new file mode 100644 index 00000000000..9926f2950f9 --- /dev/null +++ b/client/tso_request_dispatcher.go @@ -0,0 +1,751 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/grpcutil" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +type tsoDispatcher struct { + dispatcherCancel context.CancelFunc + tsoBatchController *tsoBatchController +} + +type lastTSO struct { + physical int64 + logical int64 +} + +const ( + dialTimeout = 3 * time.Second + updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. + tsLoopDCCheckInterval = time.Minute + defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst + retryInterval = 500 * time.Millisecond + maxRetryTimes = 6 +) + +func (c *client) scheduleCheckTSODispatcher() { + select { + case c.checkTSODispatcherCh <- struct{}{}: + default: + } +} + +func (c *client) scheduleUpdateTSOConnectionCtxs() { + select { + case c.updateTSOConnectionCtxsCh <- struct{}{}: + default: + } +} + +func (c *client) dispatchRequest(dcLocation string, request *tsoRequest) error { + dispatcher, ok := c.tsoDispatcher.Load(dcLocation) + if !ok { + err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation)) + log.Error("[pd/tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err)) + c.bc.ScheduleCheckMemberChanged() + return err + } + dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request + return nil +} + +// TSFuture is a future which promises to return a TSO. +type TSFuture interface { + // Wait gets the physical and logical time, it would block caller if data is not available yet. + Wait() (int64, int64, error) +} + +func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { + // If tso command duration is observed very high, the reason could be it + // takes too long for Wait() be called. + start := time.Now() + cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds()) + select { + case err = <-req.done: + err = errors.WithStack(err) + defer tsoReqPool.Put(req) + if err != nil { + cmdFailDurationTSO.Observe(time.Since(req.start).Seconds()) + return 0, 0, err + } + physical, logical = req.physical, req.logical + now := time.Now() + cmdDurationWait.Observe(now.Sub(start).Seconds()) + cmdDurationTSO.Observe(now.Sub(req.start).Seconds()) + return + case <-req.requestCtx.Done(): + return 0, 0, errors.WithStack(req.requestCtx.Err()) + case <-req.clientCtx.Done(): + return 0, 0, errors.WithStack(req.clientCtx.Err()) + } +} + +func (c *client) updateTSODispatcher() { + // Set up the new TSO dispatcher and batch controller. + c.bc.GetTSOAllocators().Range(func(dcLocationKey, _ interface{}) bool { + dcLocation := dcLocationKey.(string) + if !c.checkTSODispatcher(dcLocation) { + c.createTSODispatcher(dcLocation) + } + return true + }) + // Clean up the unused TSO dispatcher + c.tsoDispatcher.Range(func(dcLocationKey, _ interface{}) bool { + dcLocation := dcLocationKey.(string) + // Skip the Global TSO Allocator + if dcLocation == globalDCLocation { + return true + } + if dispatcher, exist := c.bc.GetTSOAllocators().Load(dcLocation); !exist { + log.Info("[pd/tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) + dispatcher.(*tsoDispatcher).dispatcherCancel() + c.tsoDispatcher.Delete(dcLocation) + } + return true + }) +} + +type deadline struct { + timer <-chan time.Time + done chan struct{} + cancel context.CancelFunc +} + +func (c *client) tsCancelLoop() { + defer c.wg.Done() + + tsCancelLoopCtx, tsCancelLoopCancel := context.WithCancel(c.ctx) + defer tsCancelLoopCancel() + + ticker := time.NewTicker(tsLoopDCCheckInterval) + defer ticker.Stop() + for { + // Watch every dc-location's tsDeadlineCh + c.bc.GetTSOAllocators().Range(func(dcLocation, _ interface{}) bool { + c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string)) + return true + }) + select { + case <-c.checkTSDeadlineCh: + continue + case <-ticker.C: + continue + case <-tsCancelLoopCtx.Done(): + return + } + } +} + +func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) { + if _, exist := c.tsDeadline.Load(dcLocation); !exist { + tsDeadlineCh := make(chan deadline, 1) + c.tsDeadline.Store(dcLocation, tsDeadlineCh) + go func(dc string, tsDeadlineCh <-chan deadline) { + for { + select { + case d := <-tsDeadlineCh: + select { + case <-d.timer: + log.Error("[pd/tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) + d.cancel() + case <-d.done: + continue + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }(dcLocation, tsDeadlineCh) + } +} + +func (c *client) scheduleCheckTSDeadline() { + select { + case c.checkTSDeadlineCh <- struct{}{}: + default: + } +} + +func (c *client) tsLoop() { + defer c.wg.Done() + + loopCtx, loopCancel := context.WithCancel(c.ctx) + defer loopCancel() + + ticker := time.NewTicker(tsLoopDCCheckInterval) + defer ticker.Stop() + for { + c.updateTSODispatcher() + select { + case <-ticker.C: + case <-c.checkTSODispatcherCh: + case <-loopCtx.Done(): + return + } + } +} + +func (c *client) checkAllocator( + dispatcherCtx context.Context, + forwardCancel context.CancelFunc, + dc, forwardedHostTrim, addrTrim, url string, + updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext)) { + defer func() { + // cancel the forward stream + forwardCancel() + requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(0) + }() + cc, u := c.bc.GetTSOAllocatorClientConnByDCLocation(dc) + healthCli := healthpb.NewHealthClient(cc) + for { + // the pd/allocator leader change, we need to re-establish the stream + if u != url { + log.Info("[pd/tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) + return + } + healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) + resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + failpoint.Inject("unreachableNetwork", func() { + resp.Status = healthpb.HealthCheckResponse_UNKNOWN + }) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + // create a stream of the original allocator + cctx, cancel := context.WithCancel(dispatcherCtx) + stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + if err == nil && stream != nil { + log.Info("[pd/tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) + updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + return + } + } + select { + case <-dispatcherCtx.Done(): + return + case <-time.After(time.Second): + // To ensure we can get the latest allocator leader + // and once the leader is changed, we can exit this function. + _, u = c.bc.GetTSOAllocatorClientConnByDCLocation(dc) + } + } +} + +func (c *client) checkTSODispatcher(dcLocation string) bool { + dispatcher, ok := c.tsoDispatcher.Load(dcLocation) + if !ok || dispatcher == nil { + return false + } + return true +} + +func (c *client) createTSODispatcher(dcLocation string) { + dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) + dispatcher := &tsoDispatcher{ + dispatcherCancel: dispatcherCancel, + tsoBatchController: newTSOBatchController( + make(chan *tsoRequest, defaultMaxTSOBatchSize*2), + defaultMaxTSOBatchSize), + } + // Each goroutine is responsible for handling the tso stream request for its dc-location. + // The only case that will make the dispatcher goroutine exit + // is that the loopCtx is done, otherwise there is no circumstance + // this goroutine should exit. + go c.handleDispatcher(dispatcherCtx, dcLocation, dispatcher.tsoBatchController) + c.tsoDispatcher.Store(dcLocation, dispatcher) + log.Info("[pd/tso] tso dispatcher created", zap.String("dc-location", dcLocation)) +} + +func (c *client) handleDispatcher( + dispatcherCtx context.Context, + dc string, + tbc *tsoBatchController) { + var ( + err error + streamAddr string + stream tsoStream + streamCtx context.Context + cancel context.CancelFunc + // addr -> connectionContext + connectionCtxs sync.Map + opts []opentracing.StartSpanOption + ) + defer func() { + log.Info("[pd/tso] exit tso dispatcher", zap.String("dc-location", dc)) + // Cancel all connections. + connectionCtxs.Range(func(_, cc interface{}) bool { + cc.(*tsoConnectionContext).cancel() + return true + }) + }() + // Call updateTSOConnectionCtxs once to init the connectionCtxs first. + c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + // Only the Global TSO needs to watch the updateTSOConnectionCtxsCh to sense the + // change of the cluster when TSO Follower Proxy is enabled. + // TODO: support TSO Follower Proxy for the Local TSO. + if dc == globalDCLocation { + go func() { + var updateTicker = &time.Ticker{} + setNewUpdateTicker := func(ticker *time.Ticker) { + if updateTicker.C != nil { + updateTicker.Stop() + } + updateTicker = ticker + } + // Set to nil before returning to ensure that the existing ticker can be GC. + defer setNewUpdateTicker(nil) + + for { + select { + case <-dispatcherCtx.Done(): + return + case <-c.option.enableTSOFollowerProxyCh: + enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + if enableTSOFollowerProxy && updateTicker.C == nil { + // Because the TSO Follower Proxy is enabled, + // the periodic check needs to be performed. + setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) + } else if !enableTSOFollowerProxy && updateTicker.C != nil { + // Because the TSO Follower Proxy is disabled, + // the periodic check needs to be turned off. + setNewUpdateTicker(&time.Ticker{}) + } else { + // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered + continue + } + case <-updateTicker.C: + case <-c.updateTSOConnectionCtxsCh: + } + c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + } + }() + } + + // Loop through each batch of TSO requests and send them for processing. + streamLoopTimer := time.NewTimer(c.option.timeout) +tsoBatchLoop: + for { + select { + case <-dispatcherCtx.Done(): + return + default: + } + // Start to collect the TSO requests. + maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() + if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { + if err == context.Canceled { + log.Info("[pd/tso] stop fetching the pending tso requests due to context canceled", + zap.String("dc-location", dc)) + } else { + log.Error("[pd/tso] fetch pending tso requests error", + zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSO, err)) + } + return + } + if maxBatchWaitInterval >= 0 { + tbc.adjustBestBatchSize() + } + streamLoopTimer.Reset(c.option.timeout) + // Choose a stream to send the TSO gRPC request. + streamChoosingLoop: + for { + connectionCtx := c.chooseStream(&connectionCtxs) + if connectionCtx != nil { + streamAddr, stream, streamCtx, cancel = connectionCtx.streamAddr, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel + } + // Check stream and retry if necessary. + if stream == nil { + log.Info("[pd/tso] tso stream is not ready", zap.String("dc", dc)) + if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { + continue streamChoosingLoop + } + select { + case <-dispatcherCtx.Done(): + return + case <-streamLoopTimer.C: + err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) + log.Error("[pd/tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) + c.bc.ScheduleCheckMemberChanged() + c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + continue tsoBatchLoop + case <-time.After(retryInterval): + continue streamChoosingLoop + } + } + select { + case <-streamCtx.Done(): + log.Info("[pd/tso] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr)) + // Set `stream` to nil and remove this stream from the `connectionCtxs` due to being canceled. + connectionCtxs.Delete(streamAddr) + cancel() + stream = nil + continue + default: + break streamChoosingLoop + } + } + done := make(chan struct{}) + dl := deadline{ + timer: time.After(c.option.timeout), + done: done, + cancel: cancel, + } + tsDeadlineCh, ok := c.tsDeadline.Load(dc) + for !ok || tsDeadlineCh == nil { + c.scheduleCheckTSDeadline() + time.Sleep(time.Millisecond * 100) + tsDeadlineCh, ok = c.tsDeadline.Load(dc) + } + select { + case <-dispatcherCtx.Done(): + return + case tsDeadlineCh.(chan deadline) <- dl: + } + opts = extractSpanReference(tbc, opts[:0]) + err = c.processTSORequests(stream, dc, tbc, opts) + close(done) + // If error happens during tso stream handling, reset stream and run the next trial. + if err != nil { + select { + case <-dispatcherCtx.Done(): + return + default: + } + c.bc.ScheduleCheckMemberChanged() + log.Error("[pd/tso] getTS error", zap.String("dc-location", dc), zap.String("stream-addr", streamAddr), errs.ZapError(errs.ErrClientGetTSO, err)) + // Set `stream` to nil and remove this stream from the `connectionCtxs` due to error. + connectionCtxs.Delete(streamAddr) + cancel() + stream = nil + // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. + if IsLeaderChange(err) { + if err := c.bc.CheckMemberChanged(); err != nil { + select { + case <-dispatcherCtx.Done(): + return + default: + } + } + // Because the TSO Follower Proxy could be configured online, + // If we change it from on -> off, background updateTSOConnectionCtxs + // will cancel the current stream, then the EOF error caused by cancel() + // should not trigger the updateTSOConnectionCtxs here. + // So we should only call it when the leader changes. + c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + } + } + } +} + +// TSO Follower Proxy only supports the Global TSO proxy now. +func (c *client) allowTSOFollowerProxy(dc string) bool { + return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() +} + +// chooseStream uses the reservoir sampling algorithm to randomly choose a connection. +// connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. +func (c *client) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { + idx := 0 + connectionCtxs.Range(func(addr, cc interface{}) bool { + j := rand.Intn(idx + 1) + if j < 1 { + connectionCtx = cc.(*tsoConnectionContext) + } + idx++ + return true + }) + return connectionCtx +} + +type tsoConnectionContext struct { + streamAddr string + // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluser, + // or tsopb.TSO_TsoClient for a primary/secondary in the TSO clusrer + stream tsoStream + ctx context.Context + cancel context.CancelFunc +} + +func (c *client) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { + // Normal connection creating, it will be affected by the `enableForwarding`. + createTSOConnection := c.tryConnectToTSO + if c.allowTSOFollowerProxy(dc) { + createTSOConnection = c.tryConnectToTSOWithProxy + } + if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { + log.Error("[pd/tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) + return false + } + return true +} + +// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable +// and enableForwarding is true, it will create a new connection to a follower to do the forwarding, +// while a new daemon will be created also to switch back to a normal leader connection ASAP the +// connection comes back to normal. +func (c *client) tryConnectToTSO( + dispatcherCtx context.Context, + dc string, + connectionCtxs *sync.Map, +) error { + var ( + networkErrNum uint64 + err error + stream tsoStream + url string + cc *grpc.ClientConn + ) + updateAndClear := func(newAddr string, connectionCtx *tsoConnectionContext) { + if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded { + // If the previous connection still exists, we should close it first. + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Store(newAddr, connectionCtx) + } + connectionCtxs.Range(func(addr, cc interface{}) bool { + if addr.(string) != newAddr { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(addr) + } + return true + }) + } + // retry several times before falling back to the follower when the network problem happens + + for i := 0; i < maxRetryTimes; i++ { + c.bc.ScheduleCheckMemberChanged() + cc, url = c.bc.GetTSOAllocatorClientConnByDCLocation(dc) + cctx, cancel := context.WithCancel(dispatcherCtx) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + failpoint.Inject("unreachableNetwork", func() { + stream = nil + err = status.New(codes.Unavailable, "unavailable").Err() + }) + if stream != nil && err == nil { + updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + return nil + } + + if err != nil && c.option.enableForwarding { + // The reason we need to judge if the error code is equal to "Canceled" here is that + // when we create a stream we use a goroutine to manually control the timeout of the connection. + // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. + // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. + // And actually the `Canceled` error can be regarded as a kind of network error in some way. + if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { + networkErrNum++ + } + } + + cancel() + select { + case <-dispatcherCtx.Done(): + return err + case <-time.After(retryInterval): + } + } + + if networkErrNum == maxRetryTimes { + // encounter the network error + backupClientConn, addr := c.backupClientConn() + if backupClientConn != nil { + log.Info("[pd/tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("addr", addr)) + forwardedHost, ok := c.bc.GetTSOAllocatorServingAddrByDCLocation(dc) + if !ok { + return errors.Errorf("cannot find the allocator leader in %s", dc) + } + + // create the follower stream + cctx, cancel := context.WithCancel(dispatcherCtx) + cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) + if err == nil { + forwardedHostTrim := trimHTTPPrefix(forwardedHost) + addrTrim := trimHTTPPrefix(addr) + // the goroutine is used to check the network and change back to the original stream + go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addrTrim, url, updateAndClear) + requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) + updateAndClear(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) + return nil + } + cancel() + } + } + return err +} + +// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers +// or of keyspace group primary/secondaries. +func (c *client) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { + var ( + addrs = c.bc.GetURLs() + streamBuilders = make(map[string]tsoStreamBuilder, len(addrs)) + cc *grpc.ClientConn + err error + ) + for _, addr := range addrs { + if len(addrs) == 0 { + continue + } + if cc, err = c.bc.GetOrCreateGRPCConn(addr); err != nil { + continue + } + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc) + } + } + return streamBuilders +} + +// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as +// a TSO proxy to reduce the pressure of the main serving service endpoint. +func (c *client) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { + tsoStreamBuilders := c.getAllTSOStreamBuilders() + leaderAddr := c.GetLeaderAddr() + forwardedHost, ok := c.bc.GetTSOAllocatorServingAddrByDCLocation(dc) + if !ok { + return errors.Errorf("cannot find the allocator leader in %s", dc) + } + // GC the stale one. + connectionCtxs.Range(func(addr, cc interface{}) bool { + if _, ok := tsoStreamBuilders[addr.(string)]; !ok { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(addr) + } + return true + }) + // Update the missing one. + for addr, tsoStreamBuilder := range tsoStreamBuilders { + if _, ok = connectionCtxs.Load(addr); ok { + continue + } + cctx, cancel := context.WithCancel(dispatcherCtx) + // Do not proxy the leader client. + if addr != leaderAddr { + log.Info("[pd/tso] use follower to forward tso stream to do the proxy", zap.String("dc", dc), zap.String("addr", addr)) + cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) + } + // Create the TSO stream. + stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout) + if err == nil { + if addr != leaderAddr { + forwardedHostTrim := trimHTTPPrefix(forwardedHost) + addrTrim := trimHTTPPrefix(addr) + requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) + } + connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) + continue + } + log.Error("[pd/tso] create the tso stream failed", zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) + cancel() + } + return nil +} + +func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanOption) []opentracing.StartSpanOption { + for _, req := range tbc.getCollectedRequests() { + if span := opentracing.SpanFromContext(req.requestCtx); span != nil { + opts = append(opts, opentracing.ChildOf(span.Context())) + } + } + return opts +} + +func (c *client) processTSORequests(stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption) error { + if len(opts) > 0 { + span := opentracing.StartSpan("pdclient.processTSORequests", opts...) + defer span.Finish() + } + + requests := tbc.getCollectedRequests() + count := int64(len(requests)) + physical, logical, suffixBits, err := stream.processRequests(c.GetClusterID(c.ctx), dcLocation, requests, tbc.batchStartTime) + if err != nil { + c.finishTSORequest(requests, 0, 0, 0, err) + return err + } + // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. + firstLogical := addLogical(logical, -count+1, suffixBits) + c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count) + c.finishTSORequest(requests, physical, firstLogical, suffixBits, nil) + return nil +} + +// Because of the suffix, we need to shift the count before we add it to the logical part. +func addLogical(logical, count int64, suffixBits uint32) int64 { + return logical + count<