Skip to content

Commit

Permalink
Implement gPRC API GetMembers() and Tso()
Browse files Browse the repository at this point in the history
rebase master

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Feb 11, 2023
1 parent add93a7 commit c1ffc53
Show file tree
Hide file tree
Showing 15 changed files with 2,294 additions and 356 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/binshi-bing/kvproto d4c6f1888b87b9be694edb116fc5f3a2869617d7
replace github.com/pingcap/kvproto => github.com/binshi-bing/kvproto v0.0.0-20230209214751-d4c6f1888b87
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/binshi-bing/kvproto v0.0.0-20230209214751-d4c6f1888b87 h1:1C/ew7iNB6JeOArr4PElV5SZoLaMC3yt1DTVklJra3c=
github.com/binshi-bing/kvproto v0.0.0-20230209214751-d4c6f1888b87/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -79,8 +81,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
50 changes: 40 additions & 10 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ import (

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/server"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"google.golang.org/grpc"
)

var _ tsopb.TSOServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
var SetUpRestHandler = func(srv *Service) (http.Handler, server.APIServiceGroup) {
return dummyRestService{}, server.APIServiceGroup{}
var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) {
return dummyRestService{}, apiutil.APIServiceGroup{}
}

type dummyRestService struct{}
Expand All @@ -39,18 +42,22 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("not implemented"))
}

// Service is the gRPC service for TSO.
// Service is the TSO grpc service.
type Service struct {
ctx context.Context
server *Server
// settings
}

// NewService creates a new TSO service.
func NewService(svr *Server) registry.RegistrableService {
func NewService(svr bs.Server) registry.RegistrableService {
server, ok := svr.(*Server)
if !ok {
log.Fatal("create tso server failed")
}
return &Service{
ctx: svr.Context(),
server: svr,
server: server,
}
}

Expand All @@ -62,17 +69,40 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {
// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
handler, group := SetUpRestHandler(s)
server.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

// GetMembers implements gRPC PDServer.
// GetMembers implements gRPC Server.
func (s *Service) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
return nil, nil
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
// at startup and needs to get the cluster ID with the first request (i.e. GetMembers).
members, err := s.server.GetMembers()
if err != nil {
return &pdpb.GetMembersResponse{
Header: tso.WrapErrorToHeader(s.server.clusterID, pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

tsoAllocatorManager := s.server.GetTSOAllocatorManager()
tsoAllocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders()
if err != nil {
return &pdpb.GetMembersResponse{
Header: tso.WrapErrorToHeader(s.server.clusterID, pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

return &pdpb.GetMembersResponse{
Header: tso.Header(s.server.clusterID),
Members: members,
Leader: nil,
EtcdLeader: nil,
TsoAllocatorLeaders: tsoAllocatorLeaders,
}, nil
}

// Tso returns a stream of timestamps
func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
return nil
return tso.Tso(s.server, stream)
}

// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set,
Expand Down
129 changes: 120 additions & 9 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,38 @@ package server
import (
"context"
"net/http"
"sync"
"time"

basicsvr "github.com/tikv/pd/pkg/basicserver"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/server/cluster"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)

// If server doesn't implement all methods of basicsvr.Server, this line will result in a clear
// error message like "*Server does not implement basicsvr.Server (missing Method method)"
var _ basicsvr.Server = (*Server)(nil)
// If server doesn't implement all methods of bs.Server, this line will result in a clear
// error message like "*Server does not implement bs.Server (missing Method method)"
var _ bs.Server = (*Server)(nil)
var _ tso.GrpcServer = (*Server)(nil)

// Server is the TSO server, and it implements basicsvr.Server.
// Server is the TSO server, and it implements bs.Server and tso.GrpcServer.
type Server struct {
ctx context.Context
name string
client *clientv3.Client
member *member.Member
ctx context.Context
name string
clusterID uint64
client *clientv3.Client
member *member.Member
tsoAllocatorManager *tso.AllocatorManager
// Store as map[string]*grpc.ClientConn
clientConns sync.Map
// Store as map[string]chan *tsoRequest
tsoDispatcher sync.Map
// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
Expand Down Expand Up @@ -94,3 +110,98 @@ func (s *Server) GetMember() *member.Member {
func (s *Server) AddLeaderCallback(callbacks ...func(context.Context)) {
s.leaderCallbacks = append(s.leaderCallbacks, callbacks...)
}

// Implement the following methods defined in tso.GrpcServer

// ClusterID returns the cluster ID of this server.
func (s *Server) ClusterID() uint64 {
return s.clusterID
}

// IsClosed checks if the server loop is closed
func (s *Server) IsClosed() bool {
// TODO: implement it
return true
}

// IsLocalRequest checks if the forwarded host is the current host
func (s *Server) IsLocalRequest(forwardedHost string) bool {
if forwardedHost == "" {
return true
}
memberAddrs := s.GetMember().Member().GetClientUrls()
for _, addr := range memberAddrs {
if addr == forwardedHost {
return true
}
}
return false
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
}

// GetTSODispatcher gets the TSO Dispatcher
func (s *Server) GetTSODispatcher() *sync.Map {
return &s.tsoDispatcher
}

// CreateTsoForwardStream creats the forward stream in the type of pdpb.PD_TsoClient
// which is the same type as tsopb.TSO_TsoClient.
func (s *Server) CreateTsoForwardStream(client *grpc.ClientConn) (pdpb.PD_TsoClient, context.CancelFunc, error) {
done := make(chan struct{})
ctx, cancel := context.WithCancel(s.ctx)
go checkStream(ctx, cancel, done)
forwardStream, err := tsopb.NewTSOClient(client).Tso(ctx)
done <- struct{}{}
return (pdpb.PD_TsoClient)(forwardStream), cancel, err
}

// GetDelegateClient returns grpc client connection talking to the forwarded host
func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
client = cc
s.clientConns.Store(forwardedHost, cc)
}
return client.(*grpc.ClientConn), nil
}

// TODO: If goroutine here timeout after a stream is created successfully, we need to handle it correctly.
func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
select {
case <-done:
return
case <-time.After(3 * time.Second):
cancel()
case <-streamCtx.Done():
}
<-done
}

// GetTLSConfig get the security config.
// TODO: implement it
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return nil
}

// Implement other methods

// GetMembers returns TSO server list.
func (s *Server) GetMembers() ([]*pdpb.Member, error) {
if s.IsClosed() {
return nil, errs.ErrServerNotStarted.FastGenByArgs()
}
members, err := cluster.GetMembers(s.GetClient())
return members, err
}
16 changes: 8 additions & 8 deletions pkg/mode/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ import (
"time"

"github.com/pingcap/log"
basicsvr "github.com/tikv/pd/pkg/basic_server"
_ "github.com/tikv/pd/pkg/mcs/registry"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/registry"
tsosvr "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
_ "google.golang.org/grpc"
"google.golang.org/grpc"
)

// TSOStart starts the TSO server.
func TSOStart(ctx context.Context, cfg *config.Config) basicsvr.Server {
func TSOStart(ctx context.Context, cfg *config.Config) bs.Server {
// start client
etcdTimeout := time.Second * 3
tlsConfig, err := cfg.Security.ToTLSConfig()
Expand All @@ -55,11 +55,11 @@ func TSOStart(ctx context.Context, cfg *config.Config) basicsvr.Server {
if err != nil {
return nil
}

// start server
svr := tsosvr.NewServer(ctx, client)
// TODO: wait for #5933 to check-in
//gs := grpc.NewServer()
//registry.ServerServiceRegistry.RegisterService("TSO", tsosvr.NewService)
//registry.ServerServiceRegistry.InstallAllGRPCServices(svr, gs)
gs := grpc.NewServer()
registry.ServerServiceRegistry.RegisterService("TSO", tsosvr.NewService)
registry.ServerServiceRegistry.InstallAllGRPCServices(svr, gs)
return svr
}
Loading

0 comments on commit c1ffc53

Please sign in to comment.