Skip to content

Commit

Permalink
Implement rpcs SetExternalTimestamp() and GetExternalTimestamp()
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Feb 12, 2023
1 parent 7d42961 commit 5d8c8f1
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 150 deletions.
38 changes: 7 additions & 31 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,51 +118,27 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set,
// and write it into all Local TSO Allocators then if it's indeed the biggest one.
func (s *Service) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
if err := s.validateInternalRequest(request.GetHeader()); err != nil {
return nil, err
}
return tso.SyncMaxTS(ctx, s.server, request)
}

// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager.
func (s *Service) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) {
if err := s.validateInternalRequest(request.GetHeader()); err != nil {
return nil, err
}
// TODO: Check if the keyspace replica is the primary
return tso.GetDCLocationInfo(ctx, s.server, request)
}

// SetExternalTimestamp sets a given external timestamp to perform stale read.
func (s *Service) SetExternalTimestamp(ctx context.Context, request *pdpb.SetExternalTimestampRequest) (*pdpb.SetExternalTimestampResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
forward := func(ctx context.Context, client *grpc.ClientConn, request *pdpb.SetExternalTimestampRequest) (*pdpb.SetExternalTimestampResponse, error) {
return tsopb.NewTSOClient(client).SetExternalTimestamp(ctx, request)
}
return nil, nil
return tso.SetExternalTimestamp(ctx, s.server, request, forward)
}

// GetExternalTimestamp gets the saved external timstamp.
func (s *Service) GetExternalTimestamp(ctx context.Context, request *pdpb.GetExternalTimestampRequest) (*pdpb.GetExternalTimestampResponse, error) {
return nil, nil
}

// validateInternalRequest checks if server is closed, which is used to validate
// the gRPC communication between TSO servers internally.
// TODO: check if the sender is from the global TSO allocator
func (s *Service) validateInternalRequest(_ *pdpb.RequestHeader) error {
if s.server.IsClosed() {
return ErrNotStarted
}
return nil
}

// validateRequest checks if Server is leader and clusterID is matched.
// TODO: Call it in gRPC interceptor.
func (s *Service) validateRequest(header *pdpb.RequestHeader) error {
if s.server.IsClosed() || !s.server.GetMember().IsLeader() {
return ErrNotLeader
}
if header.GetClusterId() != s.server.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.server.clusterID, header.GetClusterId())
forward := func(ctx context.Context, client *grpc.ClientConn, request *pdpb.GetExternalTimestampRequest) (*pdpb.GetExternalTimestampResponse, error) {
return tsopb.NewTSOClient(client).GetExternalTimestamp(ctx, request)
}
return nil
return tso.GetExternalTimestamp(ctx, s.server, request, forward)
}
54 changes: 30 additions & 24 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package server

import (
"context"
"errors"
"net/http"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/member"
Expand All @@ -32,8 +30,9 @@ import (
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/cluster"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// If server doesn't implement all methods of bs.Server, this line will result in a clear
Expand Down Expand Up @@ -181,6 +180,30 @@ func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*
return client.(*grpc.ClientConn), nil
}

// ValidateInternalRequest checks if server is closed, which is used to validate
// the gRPC communication between TSO servers internally.
// TODO: Check if the sender is from the global TSO allocator
func (s *Server) ValidateInternalRequest(_ *pdpb.RequestHeader, _ bool) error {
if s.IsClosed() {
return ErrNotStarted
}
return nil
}

// ValidateRequest checks if the keyspace replica is the primary and clusterID is matched.
// TODO: Check if the keyspace replica is the primary
func (s *Server) ValidateRequest(header *pdpb.RequestHeader) error {
if s.IsClosed() {
return ErrNotLeader
}
if header.GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId())
}
return nil
}

// Implement other methods

// GetGlobalTS returns global tso.
func (s *Server) GetGlobalTS() (uint64, error) {
ts, err := s.tsoAllocatorManager.GetGlobalTSO()
Expand All @@ -190,30 +213,15 @@ func (s *Server) GetGlobalTS() (uint64, error) {
return tsoutil.GenerateTS(ts), nil
}

// GetExternalTS returns external timestamp.
// TODO: Implement GetExternalTS. Get extentTS from the persistent storage
// GetExternalTS returns external timestamp from the cache or the persistent storage.
// TODO: Implement GetExternalTS
func (s *Server) GetExternalTS() uint64 {
return 0
}

// SetExternalTS returns external timestamp.
// SetExternalTS saves external timestamp to cache and the persistent storage.
// TODO: Implement SetExternalTS
func (s *Server) SetExternalTS(externalTS uint64) error {
globalTS, err := s.GetGlobalTS()
if err != nil {
return err
}
if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 {
desc := "the external timestamp should not be larger than global ts"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS))
return errors.New(desc)
}
currentExternalTS := s.GetExternalTS()
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
desc := "the external timestamp should be larger than now"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("current external timestamp", currentExternalTS))
return errors.New(desc)
}
// TODO: persistent externalTS
return nil
}

Expand All @@ -235,8 +243,6 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return nil
}

// Implement other methods

// GetMembers returns TSO server list.
func (s *Server) GetMembers() ([]*pdpb.Member, error) {
if s.IsClosed() {
Expand Down
109 changes: 106 additions & 3 deletions pkg/tso/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type GrpcServer interface {
GetTSODispatcher() *sync.Map
CreateTsoForwardStream(client *grpc.ClientConn) (pdpb.PD_TsoClient, context.CancelFunc, error)
GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error)
ValidateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error
ValidateRequest(header *pdpb.RequestHeader) error
GetExternalTS() uint64
SetExternalTS(externalTS uint64) error
}

type tsoRequest struct {
Expand Down Expand Up @@ -128,6 +132,9 @@ var mockLocalAllocatorLeaderChangeFlag = false
// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs is holding when skipCheck is set,
// and write it into all Local TSO Allocators then if it's indeed the biggest one.
func SyncMaxTS(_ context.Context, s GrpcServer, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
if err := s.ValidateInternalRequest(request.GetHeader(), true); err != nil {
return nil, err
}
tsoAllocatorManager := s.GetTSOAllocatorManager()
// There is no dc-location found in this server, return err.
if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 {
Expand Down Expand Up @@ -222,6 +229,10 @@ func SyncMaxTS(_ context.Context, s GrpcServer, request *pdpb.SyncMaxTSRequest)

// GetDCLocationInfo gets the dc-location info of the given dc-location from the primary's TSO allocator manager.
func GetDCLocationInfo(ctx context.Context, s GrpcServer, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) {
var err error
if err = s.ValidateInternalRequest(request.GetHeader(), false); err != nil {
return nil, err
}
am := s.GetTSOAllocatorManager()
info, ok := am.GetDCLocationInfo(request.GetDcLocation())
if !ok {
Expand All @@ -243,7 +254,6 @@ func GetDCLocationInfo(ctx context.Context, s GrpcServer, request *pdpb.GetDCLoc
// So we will force the newly added Local TSO Allocator to have a Global TSO synchronization
// when it becomes the Local TSO Allocator leader.
// Please take a look at https://github.com/tikv/pd/issues/3260 for more details.
var err error
if resp.MaxTs, err = am.GetMaxLocalTSO(ctx); err != nil {
return &pdpb.GetDCLocationInfoResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN, err.Error()),
Expand All @@ -252,6 +262,62 @@ func GetDCLocationInfo(ctx context.Context, s GrpcServer, request *pdpb.GetDCLoc
return resp, nil
}

type forwardSetExternalTSRequest func(context.Context, *grpc.ClientConn, *pdpb.SetExternalTimestampRequest) (*pdpb.SetExternalTimestampResponse, error)

// SetExternalTimestamp implements gRPC Server.
func SetExternalTimestamp(ctx context.Context, s GrpcServer, request *pdpb.SetExternalTimestampRequest,
forward forwardSetExternalTSRequest) (*pdpb.SetExternalTimestampResponse, error) {
forwardedHost := grpcutil.GetForwardedHost(ctx)
if !s.IsLocalRequest(forwardedHost) {
client, err := s.GetDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return forward(ctx, client, request)
}

if err := s.ValidateRequest(request.GetHeader()); err != nil {
return nil, err
}

timestamp := request.GetTimestamp()
if err := setExternalTS(s, timestamp); err != nil {
return &pdpb.SetExternalTimestampResponse{Header: invalidValue(s.ClusterID(), err.Error())}, nil
}
log.Debug("set external timestamp",
zap.Uint64("timestamp", timestamp))
return &pdpb.SetExternalTimestampResponse{
Header: Header(s.ClusterID()),
}, nil
}

type forwardGetExternalTSRequest func(context.Context, *grpc.ClientConn, *pdpb.GetExternalTimestampRequest) (*pdpb.GetExternalTimestampResponse, error)

// GetExternalTimestamp implements gRPC Server.
func GetExternalTimestamp(ctx context.Context, s GrpcServer, request *pdpb.GetExternalTimestampRequest,
forward forwardGetExternalTSRequest) (*pdpb.GetExternalTimestampResponse, error) {
forwardedHost := grpcutil.GetForwardedHost(ctx)
if !s.IsLocalRequest(forwardedHost) {
client, err := s.GetDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return forward(ctx, client, request)
}

if err := s.ValidateRequest(request.GetHeader()); err != nil {
return nil, err
}

timestamp := s.GetExternalTS()
return &pdpb.GetExternalTimestampResponse{
Header: Header(s.ClusterID()),
Timestamp: timestamp,
}, nil
}

// Header is a helper function to wrapper the response header for the given cluster id
func Header(clusterID uint64) *pdpb.ResponseHeader {
if clusterID == 0 {
Expand All @@ -262,7 +328,7 @@ func Header(clusterID uint64) *pdpb.ResponseHeader {

// WrapErrorToHeader is a helper function to wrapper the response header for the given cluster id and the error info
func WrapErrorToHeader(clusterID uint64, errorType pdpb.ErrorType, message string) *pdpb.ResponseHeader {
return ErrorHeader(
return errorHeader(
clusterID,
&pdpb.Error{
Type: errorType,
Expand All @@ -271,13 +337,22 @@ func WrapErrorToHeader(clusterID uint64, errorType pdpb.ErrorType, message strin
}

// ErrorHeader is a helper function to wrapper the response header for the given cluster id and the error
func ErrorHeader(clusterID uint64, err *pdpb.Error) *pdpb.ResponseHeader {
func errorHeader(clusterID uint64, err *pdpb.Error) *pdpb.ResponseHeader {
return &pdpb.ResponseHeader{
ClusterId: clusterID,
Error: err,
}
}

func invalidValue(clusterID uint64, msg string) *pdpb.ResponseHeader {
return errorHeader(
clusterID,
&pdpb.Error{
Type: pdpb.ErrorType_INVALID_VALUE,
Message: msg,
})
}

func dispatchTSORequest(ctx context.Context, s GrpcServer, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) {
tsoRequestChInterface, loaded := s.GetTSODispatcher().LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests))
if !loaded {
Expand Down Expand Up @@ -447,3 +522,31 @@ func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) {
}
}
}

func getGlobalTS(s GrpcServer) (uint64, error) {
ts, err := s.GetTSOAllocatorManager().GetGlobalTSO()
if err != nil {
return 0, err
}
return tsoutil.GenerateTS(ts), nil
}

// SetExternalTS returns external timestamp.
func setExternalTS(s GrpcServer, externalTS uint64) error {
globalTS, err := getGlobalTS(s)
if err != nil {
return err
}
if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 {
desc := "the external timestamp should not be larger than global ts"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS))
return errors.New(desc)
}
currentExternalTS := s.GetExternalTS()
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
desc := "the external timestamp should be larger than now"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("current external timestamp", currentExternalTS))
return errors.New(desc)
}
return s.SetExternalTS(externalTS)
}
Loading

0 comments on commit 5d8c8f1

Please sign in to comment.