Skip to content

Commit

Permalink
Implement gPRC API GetMembers() and Tso()
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Feb 10, 2023
1 parent f64b350 commit e74aa09
Show file tree
Hide file tree
Showing 9 changed files with 540 additions and 322 deletions.
32 changes: 28 additions & 4 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/server"
"google.golang.org/grpc"
)
Expand All @@ -39,7 +40,7 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("not implemented"))
}

// Service is the gRPC service for TSO.
// Service is the TSO grpc service.
type Service struct {
ctx context.Context
server *Server
Expand All @@ -65,14 +66,37 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler
server.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

// GetMembers implements gRPC PDServer.
// GetMembers implements gRPC Server.
func (s *Service) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
return nil, nil
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
// at startup and needs to get the cluster ID with the first request (i.e. GetMembers).
members, err := s.server.GetMembers()
if err != nil {
return &pdpb.GetMembersResponse{
Header: tso.WrapErrorToHeader(s.server.clusterID, pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

tsoAllocatorManager := s.server.GetTSOAllocatorManager()
tsoAllocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders()
if err != nil {
return &pdpb.GetMembersResponse{
Header: tso.WrapErrorToHeader(s.server.clusterID, pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

return &pdpb.GetMembersResponse{
Header: tso.Header(s.server.clusterID),
Members: members,
Leader: nil,
EtcdLeader: nil,
TsoAllocatorLeaders: tsoAllocatorLeaders,
}, nil
}

// Tso returns a stream of timestamps
func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
return nil
return tso.Tso(s.server, stream)
}

// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set,
Expand Down
123 changes: 117 additions & 6 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,38 @@ package server
import (
"context"
"net/http"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
basicsvr "github.com/tikv/pd/pkg/basic_server"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/server/cluster"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)

// If server doesn't implement all methods of basicsvr.Server, this line will result in a clear
// error message like "*Server does not implement basicsvr.Server (missing Method method)"
var _ basicsvr.Server = (*Server)(nil)
var _ tso.GrpcServer = (*Server)(nil)

// Server is the TSO server, and it implements basicsvr.Server.
// Server is the TSO server, and it implements basicsvr.Server and tso.GrpcServer.
type Server struct {
ctx context.Context
name string
client *clientv3.Client
member *member.Member
ctx context.Context
name string
clusterID uint64
client *clientv3.Client
member *member.Member
tsoAllocatorManager *tso.AllocatorManager
// Store as map[string]*grpc.ClientConn
clientConns sync.Map
// Store as map[string]chan *tsoRequest
tsoDispatcher sync.Map
// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
Expand All @@ -49,7 +65,7 @@ func NewServer(ctx context.Context, client *clientv3.Client) *Server {
}
}

// TODO: Implement the following methods defined in basicsvr.Server
// Implement the following methods defined in basicsvr.Server

// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
Expand Down Expand Up @@ -94,3 +110,98 @@ func (s *Server) GetMember() *member.Member {
func (s *Server) AddLeaderCallback(callbacks ...func(context.Context)) {
s.leaderCallbacks = append(s.leaderCallbacks, callbacks...)
}

// Implement the following methods defined in tso.GrpcServer

// ClusterID returns the cluster ID of this server.
func (s *Server) ClusterID() uint64 {
return s.clusterID
}

// IsClosed checks if the server loop is closed
func (s *Server) IsClosed() bool {
// TODO: implement it
return true
}

// IsLocalRequest checks if the forwarded host is the current host
func (s *Server) IsLocalRequest(forwardedHost string) bool {
if forwardedHost == "" {
return true
}
memberAddrs := s.GetMember().Member().GetClientUrls()
for _, addr := range memberAddrs {
if addr == forwardedHost {
return true
}
}
return false
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
}

// GetTSODispatcher gets the TSO Dispatcher
func (s *Server) GetTSODispatcher() *sync.Map {
return &s.tsoDispatcher
}

// CreateTsoForwardStream creats the forward stream in the type of pdpb.PD_TsoClient
// which is the same type as tsopb.TSO_TsoClient.
func (s *Server) CreateTsoForwardStream(client *grpc.ClientConn) (pdpb.PD_TsoClient, context.CancelFunc, error) {
done := make(chan struct{})
ctx, cancel := context.WithCancel(s.ctx)
go checkStream(ctx, cancel, done)
forwardStream, err := tsopb.NewTSOClient(client).Tso(ctx)
done <- struct{}{}
return (pdpb.PD_TsoClient)(forwardStream), cancel, err
}

// GetDelegateClient returns grpc client connection talking to the forwarded host
func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
client = cc
s.clientConns.Store(forwardedHost, cc)
}
return client.(*grpc.ClientConn), nil
}

// TODO: If goroutine here timeout after a stream is created successfully, we need to handle it correctly.
func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
select {
case <-done:
return
case <-time.After(3 * time.Second):
cancel()
case <-streamCtx.Done():
}
<-done
}

// GetTLSConfig get the security config.
// TODO: implement it
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return nil
}

// Implement other methods

// GetMembers returns TSO server list.
func (s *Server) GetMembers() ([]*pdpb.Member, error) {
if s.IsClosed() {
return nil, errs.ErrServerNotStarted.FastGenByArgs()
}
members, err := cluster.GetMembers(s.GetClient())
return members, err
}
10 changes: 4 additions & 6 deletions pkg/mode/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (

"github.com/pingcap/log"
basicsvr "github.com/tikv/pd/pkg/basic_server"
_ "github.com/tikv/pd/pkg/mcs/registry"
tsosvr "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
_ "google.golang.org/grpc"
)

// TSOStart starts the TSO server.
Expand Down Expand Up @@ -57,9 +55,9 @@ func TSOStart(ctx context.Context, cfg *config.Config) basicsvr.Server {
}
// start server
svr := tsosvr.NewServer(ctx, client)
// TODO: wait for #5933 to check-in
//gs := grpc.NewServer()
//registry.ServerServiceRegistry.RegisterService("TSO", tsosvr.NewService)
//registry.ServerServiceRegistry.InstallAllGRPCServices(svr, gs)
// TODO: wait for #5933 to uncomment the following lines
// gs := grpc.NewServer()
// registry.ServerServiceRegistry.RegisterService("TSO", tsosvr.NewService)
// registry.ServerServiceRegistry.InstallAllGRPCServices(svr, gs)
return svr
}
Loading

0 comments on commit e74aa09

Please sign in to comment.