From e74aa096da7255f60908fba52d6180d3c677d605 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 9 Feb 2023 17:36:43 -0800 Subject: [PATCH] Implement gPRC API GetMembers() and Tso() Signed-off-by: Bin Shi --- pkg/mcs/tso/server/grpc_service.go | 32 ++- pkg/mcs/tso/server/server.go | 123 ++++++++++- pkg/mode/tso.go | 10 +- pkg/tso/grpc_server.go | 316 +++++++++++++++++++++++++++++ pkg/tso/metrics.go | 30 +++ pkg/utils/grpcutil/grpcutil.go | 12 ++ server/grpc_service.go | 304 +++------------------------ server/metrics.go | 30 --- server/server.go | 5 + 9 files changed, 540 insertions(+), 322 deletions(-) create mode 100644 pkg/tso/grpc_server.go diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index d3f652f9e34a..f7e1daaabde4 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/tikv/pd/pkg/mcs/registry" + "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/server" "google.golang.org/grpc" ) @@ -39,7 +40,7 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write([]byte("not implemented")) } -// Service is the gRPC service for TSO. +// Service is the TSO grpc service. type Service struct { ctx context.Context server *Server @@ -65,14 +66,37 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler server.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } -// GetMembers implements gRPC PDServer. +// GetMembers implements gRPC Server. func (s *Service) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) { - return nil, nil + // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID + // at startup and needs to get the cluster ID with the first request (i.e. GetMembers). + members, err := s.server.GetMembers() + if err != nil { + return &pdpb.GetMembersResponse{ + Header: tso.WrapErrorToHeader(s.server.clusterID, pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + + tsoAllocatorManager := s.server.GetTSOAllocatorManager() + tsoAllocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders() + if err != nil { + return &pdpb.GetMembersResponse{ + Header: tso.WrapErrorToHeader(s.server.clusterID, pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + + return &pdpb.GetMembersResponse{ + Header: tso.Header(s.server.clusterID), + Members: members, + Leader: nil, + EtcdLeader: nil, + TsoAllocatorLeaders: tsoAllocatorLeaders, + }, nil } // Tso returns a stream of timestamps func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { - return nil + return tso.Tso(s.server, stream) } // SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set, diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 9279c24713ef..675e63da4d57 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -17,22 +17,38 @@ package server import ( "context" "net/http" + "sync" + "time" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/tsopb" basicsvr "github.com/tikv/pd/pkg/basic_server" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/server/cluster" "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" ) // If server doesn't implement all methods of basicsvr.Server, this line will result in a clear // error message like "*Server does not implement basicsvr.Server (missing Method method)" var _ basicsvr.Server = (*Server)(nil) +var _ tso.GrpcServer = (*Server)(nil) -// Server is the TSO server, and it implements basicsvr.Server. +// Server is the TSO server, and it implements basicsvr.Server and tso.GrpcServer. type Server struct { - ctx context.Context - name string - client *clientv3.Client - member *member.Member + ctx context.Context + name string + clusterID uint64 + client *clientv3.Client + member *member.Member + tsoAllocatorManager *tso.AllocatorManager + // Store as map[string]*grpc.ClientConn + clientConns sync.Map + // Store as map[string]chan *tsoRequest + tsoDispatcher sync.Map // Callback functions for different stages // startCallbacks will be called after the server is started. startCallbacks []func() @@ -49,7 +65,7 @@ func NewServer(ctx context.Context, client *clientv3.Client) *Server { } } -// TODO: Implement the following methods defined in basicsvr.Server +// Implement the following methods defined in basicsvr.Server // Name returns the unique etcd Name for this server in etcd cluster. func (s *Server) Name() string { @@ -94,3 +110,98 @@ func (s *Server) GetMember() *member.Member { func (s *Server) AddLeaderCallback(callbacks ...func(context.Context)) { s.leaderCallbacks = append(s.leaderCallbacks, callbacks...) } + +// Implement the following methods defined in tso.GrpcServer + +// ClusterID returns the cluster ID of this server. +func (s *Server) ClusterID() uint64 { + return s.clusterID +} + +// IsClosed checks if the server loop is closed +func (s *Server) IsClosed() bool { + // TODO: implement it + return true +} + +// IsLocalRequest checks if the forwarded host is the current host +func (s *Server) IsLocalRequest(forwardedHost string) bool { + if forwardedHost == "" { + return true + } + memberAddrs := s.GetMember().Member().GetClientUrls() + for _, addr := range memberAddrs { + if addr == forwardedHost { + return true + } + } + return false +} + +// GetTSOAllocatorManager returns the manager of TSO Allocator. +func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager { + return s.tsoAllocatorManager +} + +// GetTSODispatcher gets the TSO Dispatcher +func (s *Server) GetTSODispatcher() *sync.Map { + return &s.tsoDispatcher +} + +// CreateTsoForwardStream creats the forward stream in the type of pdpb.PD_TsoClient +// which is the same type as tsopb.TSO_TsoClient. +func (s *Server) CreateTsoForwardStream(client *grpc.ClientConn) (pdpb.PD_TsoClient, context.CancelFunc, error) { + done := make(chan struct{}) + ctx, cancel := context.WithCancel(s.ctx) + go checkStream(ctx, cancel, done) + forwardStream, err := tsopb.NewTSOClient(client).Tso(ctx) + done <- struct{}{} + return (pdpb.PD_TsoClient)(forwardStream), cancel, err +} + +// GetDelegateClient returns grpc client connection talking to the forwarded host +func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) { + client, ok := s.clientConns.Load(forwardedHost) + if !ok { + tlsConfig, err := s.GetTLSConfig().ToTLSConfig() + if err != nil { + return nil, err + } + cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig) + if err != nil { + return nil, err + } + client = cc + s.clientConns.Store(forwardedHost, cc) + } + return client.(*grpc.ClientConn), nil +} + +// TODO: If goroutine here timeout after a stream is created successfully, we need to handle it correctly. +func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { + select { + case <-done: + return + case <-time.After(3 * time.Second): + cancel() + case <-streamCtx.Done(): + } + <-done +} + +// GetTLSConfig get the security config. +// TODO: implement it +func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { + return nil +} + +// Implement other methods + +// GetMembers returns TSO server list. +func (s *Server) GetMembers() ([]*pdpb.Member, error) { + if s.IsClosed() { + return nil, errs.ErrServerNotStarted.FastGenByArgs() + } + members, err := cluster.GetMembers(s.GetClient()) + return members, err +} diff --git a/pkg/mode/tso.go b/pkg/mode/tso.go index d674e3668bcd..af953838639b 100644 --- a/pkg/mode/tso.go +++ b/pkg/mode/tso.go @@ -20,12 +20,10 @@ import ( "github.com/pingcap/log" basicsvr "github.com/tikv/pd/pkg/basic_server" - _ "github.com/tikv/pd/pkg/mcs/registry" tsosvr "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/server/config" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" - _ "google.golang.org/grpc" ) // TSOStart starts the TSO server. @@ -57,9 +55,9 @@ func TSOStart(ctx context.Context, cfg *config.Config) basicsvr.Server { } // start server svr := tsosvr.NewServer(ctx, client) - // TODO: wait for #5933 to check-in - //gs := grpc.NewServer() - //registry.ServerServiceRegistry.RegisterService("TSO", tsosvr.NewService) - //registry.ServerServiceRegistry.InstallAllGRPCServices(svr, gs) + // TODO: wait for #5933 to uncomment the following lines + // gs := grpc.NewServer() + // registry.ServerServiceRegistry.RegisterService("TSO", tsosvr.NewService) + // registry.ServerServiceRegistry.InstallAllGRPCServices(svr, gs) return svr } diff --git a/pkg/tso/grpc_server.go b/pkg/tso/grpc_server.go new file mode 100644 index 000000000000..c17c25cc8d17 --- /dev/null +++ b/pkg/tso/grpc_server.go @@ -0,0 +1,316 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "context" + "io" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/grpcutil" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + maxMergeTSORequests = 10000 + defaultTSOProxyTimeout = 3 * time.Second +) + +// GrpcServer wraps Server to provide TSO grpc service. +type GrpcServer interface { + ClusterID() uint64 + IsClosed() bool + IsLocalRequest(forwardedHost string) bool + GetTSOAllocatorManager() *AllocatorManager + GetTSODispatcher() *sync.Map + CreateTsoForwardStream(client *grpc.ClientConn) (pdpb.PD_TsoClient, context.CancelFunc, error) + GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) +} + +// Tso implements gRPC PDServer. +func Tso(s GrpcServer, stream pdpb.PD_TsoServer) error { + var ( + doneCh chan struct{} + errCh chan error + ) + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + for { + // Prevent unnecessary performance overhead of the channel. + if errCh != nil { + select { + case err := <-errCh: + return errors.WithStack(err) + default: + } + } + request, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + streamCtx := stream.Context() + forwardedHost := grpcutil.GetForwardedHost(streamCtx) + if !s.IsLocalRequest(forwardedHost) { + if errCh == nil { + doneCh = make(chan struct{}) + defer close(doneCh) + errCh = make(chan error) + } + dispatchTSORequest(ctx, s, &tsoRequest{ + forwardedHost, + request, + stream, + }, forwardedHost, doneCh, errCh) + continue + } + + start := time.Now() + // TSO uses leader lease to determine validity. No need to check leader here. + if s.IsClosed() { + return status.Errorf(codes.Unknown, "server not started") + } + if request.GetHeader().GetClusterId() != s.ClusterID() { + return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.ClusterID(), request.GetHeader().GetClusterId()) + } + count := request.GetCount() + ts, err := s.GetTSOAllocatorManager().HandleTSORequest(request.GetDcLocation(), count) + if err != nil { + return status.Errorf(codes.Unknown, err.Error()) + } + tsoHandleDuration.Observe(time.Since(start).Seconds()) + response := &pdpb.TsoResponse{ + Header: Header(s.ClusterID()), + Timestamp: &ts, + Count: count, + } + if err := stream.Send(response); err != nil { + return errors.WithStack(err) + } + } +} + +// Header is a helper function to wrapper the response header for the given cluster id +func Header(clusterID uint64) *pdpb.ResponseHeader { + if clusterID == 0 { + return WrapErrorToHeader(clusterID, pdpb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready") + } + return &pdpb.ResponseHeader{ClusterId: clusterID} +} + +// WrapErrorToHeader is a helper function to wrapper the response header for the given cluster id and the error info +func WrapErrorToHeader(clusterID uint64, errorType pdpb.ErrorType, message string) *pdpb.ResponseHeader { + return ErrorHeader( + clusterID, + &pdpb.Error{ + Type: errorType, + Message: message, + }) +} + +// ErrorHeader is a helper function to wrapper the response header for the given cluster id and the error +func ErrorHeader(clusterID uint64, err *pdpb.Error) *pdpb.ResponseHeader { + return &pdpb.ResponseHeader{ + ClusterId: clusterID, + Error: err, + } +} + +type tsoRequest struct { + forwardedHost string + request *pdpb.TsoRequest + stream pdpb.PD_TsoServer +} + +func dispatchTSORequest(ctx context.Context, s GrpcServer, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) { + tsoRequestChInterface, loaded := s.GetTSODispatcher().LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests)) + if !loaded { + tsDeadlineCh := make(chan deadline, 1) + go handleDispatcher(ctx, s, forwardedHost, tsoRequestChInterface.(chan *tsoRequest), tsDeadlineCh, doneCh, errCh) + go watchTSDeadline(ctx, tsDeadlineCh) + } + tsoRequestChInterface.(chan *tsoRequest) <- request +} + +func handleDispatcher(ctx context.Context, s GrpcServer, forwardedHost string, tsoRequestCh <-chan *tsoRequest, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) { + dispatcherCtx, ctxCancel := context.WithCancel(ctx) + defer ctxCancel() + defer s.GetTSODispatcher().Delete(forwardedHost) + + var ( + forwardStream pdpb.PD_TsoClient + cancel context.CancelFunc + ) + client, err := s.GetDelegateClient(ctx, forwardedHost) + if err != nil { + goto errHandling + } + log.Info("create tso forward stream", zap.String("forwarded-host", forwardedHost)) + forwardStream, cancel, err = s.CreateTsoForwardStream(client) +errHandling: + if err != nil || forwardStream == nil { + log.Error("create tso forwarding stream error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCCreateStream, err)) + select { + case <-dispatcherCtx.Done(): + return + case _, ok := <-doneCh: + if !ok { + return + } + case errCh <- err: + close(errCh) + return + } + } + defer cancel() + + requests := make([]*tsoRequest, maxMergeTSORequests+1) + for { + select { + case first := <-tsoRequestCh: + pendingTSOReqCount := len(tsoRequestCh) + 1 + requests[0] = first + for i := 1; i < pendingTSOReqCount; i++ { + requests[i] = <-tsoRequestCh + } + done := make(chan struct{}) + dl := deadline{ + timer: time.After(defaultTSOProxyTimeout), + done: done, + cancel: cancel, + } + select { + case tsDeadlineCh <- dl: + case <-dispatcherCtx.Done(): + return + } + err = processTSORequests(s, forwardStream, requests[:pendingTSOReqCount]) + close(done) + if err != nil { + log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err)) + select { + case <-dispatcherCtx.Done(): + return + case _, ok := <-doneCh: + if !ok { + return + } + case errCh <- err: + close(errCh) + return + } + } + case <-dispatcherCtx.Done(): + return + } + } +} + +func processTSORequests(s GrpcServer, forwardStream pdpb.PD_TsoClient, requests []*tsoRequest) error { + start := time.Now() + // Merge the requests + count := uint32(0) + for _, request := range requests { + count += request.request.GetCount() + } + req := &pdpb.TsoRequest{ + Header: requests[0].request.GetHeader(), + Count: count, + // TODO: support Local TSO proxy forwarding. + DcLocation: requests[0].request.GetDcLocation(), + } + // Send to the leader stream. + if err := forwardStream.Send(req); err != nil { + return err + } + resp, err := forwardStream.Recv() + if err != nil { + return err + } + tsoProxyHandleDuration.Observe(time.Since(start).Seconds()) + tsoProxyBatchSize.Observe(float64(count)) + // Split the response + physical, logical, suffixBits := resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits() + // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. + // This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10, + // count is 5, then the splitting results should be 5 and 10. + firstLogical := addLogical(logical, -int64(count), suffixBits) + return finishTSORequest(s, requests, physical, firstLogical, suffixBits) +} + +// Because of the suffix, we need to shift the count before we add it to the logical part. +func addLogical(logical, count int64, suffixBits uint32) int64 { + return logical + count<