Skip to content

Commit

Permalink
Add and refactor SyncMaxTS() and GetDCLocationInfo()
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Feb 11, 2023
1 parent c1ffc53 commit 37f6c44
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 137 deletions.
47 changes: 44 additions & 3 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ import (
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
)

var _ tsopb.TSOServer = (*Service)(nil)
Expand Down Expand Up @@ -107,21 +117,52 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {

// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set,
// and write it into all Local TSO Allocators then if it's indeed the biggest one.
func (s *Service) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
return nil, nil
func (s *Service) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
if err := s.validateInternalRequest(request.GetHeader()); err != nil {
return nil, err
}
return tso.SyncMaxTS(ctx, s.server, request)
}

// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager.
func (s *Service) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) {
return nil, nil
if err := s.validateInternalRequest(request.GetHeader()); err != nil {
return nil, err
}
return tso.GetDCLocationInfo(ctx, s.server, request)
}

// SetExternalTimestamp sets a given external timestamp to perform stale read.
func (s *Service) SetExternalTimestamp(ctx context.Context, request *pdpb.SetExternalTimestampRequest) (*pdpb.SetExternalTimestampResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}
return nil, nil
}

// GetExternalTimestamp gets the saved external timstamp.
func (s *Service) GetExternalTimestamp(ctx context.Context, request *pdpb.GetExternalTimestampRequest) (*pdpb.GetExternalTimestampResponse, error) {
return nil, nil
}

// validateInternalRequest checks if server is closed, which is used to validate
// the gRPC communication between TSO servers internally.
// TODO: check if the sender is from the global TSO allocator
func (s *Service) validateInternalRequest(_ *pdpb.RequestHeader) error {
if s.server.IsClosed() {
return ErrNotStarted
}
return nil
}

// validateRequest checks if Server is leader and clusterID is matched.
// TODO: Call it in gRPC interceptor.
func (s *Service) validateRequest(header *pdpb.RequestHeader) error {
if s.server.IsClosed() || !s.server.GetMember().IsLeader() {
return ErrNotLeader
}
if header.GetClusterId() != s.server.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.server.clusterID, header.GetClusterId())
}
return nil
}
40 changes: 40 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@ package server

import (
"context"
"errors"
"net/http"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/cluster"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -177,6 +181,42 @@ func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*
return client.(*grpc.ClientConn), nil
}

// GetGlobalTS returns global tso.
func (s *Server) GetGlobalTS() (uint64, error) {
ts, err := s.tsoAllocatorManager.GetGlobalTSO()
if err != nil {
return 0, err
}
return tsoutil.GenerateTS(ts), nil
}

// GetExternalTS returns external timestamp.
// TODO: Implement GetExternalTS. Get extentTS from the persistent storage
func (s *Server) GetExternalTS() uint64 {
return 0
}

// SetExternalTS returns external timestamp.
func (s *Server) SetExternalTS(externalTS uint64) error {
globalTS, err := s.GetGlobalTS()
if err != nil {
return err
}
if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 {
desc := "the external timestamp should not be larger than global ts"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS))
return errors.New(desc)
}
currentExternalTS := s.GetExternalTS()
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
desc := "the external timestamp should be larger than now"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("current external timestamp", currentExternalTS))
return errors.New(desc)
}
// TODO: persistent externalTS
return nil
}

// TODO: If goroutine here timeout after a stream is created successfully, we need to handle it correctly.
func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
select {
Expand Down
147 changes: 140 additions & 7 deletions pkg/tso/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ package tso

import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -47,7 +50,13 @@ type GrpcServer interface {
GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error)
}

// Tso implements gRPC PDServer.
type tsoRequest struct {
forwardedHost string
request *pdpb.TsoRequest
stream pdpb.PD_TsoServer
}

// Tso implements gRPC Server.
func Tso(s GrpcServer, stream pdpb.PD_TsoServer) error {
var (
doneCh chan struct{}
Expand Down Expand Up @@ -113,6 +122,136 @@ func Tso(s GrpcServer, stream pdpb.PD_TsoServer) error {
}
}

// Only used for the TestLocalAllocatorLeaderChange.
var mockLocalAllocatorLeaderChangeFlag = false

// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs is holding when skipCheck is set,
// and write it into all Local TSO Allocators then if it's indeed the biggest one.
func SyncMaxTS(_ context.Context, s GrpcServer, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
tsoAllocatorManager := s.GetTSOAllocatorManager()
// There is no dc-location found in this server, return err.
if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 {
return &pdpb.SyncMaxTSResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN,
"empty cluster dc-location found, checker may not work properly"),
}, nil
}
// Get all Local TSO Allocator leaders
allocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders()
if err != nil {
return &pdpb.SyncMaxTSResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
if !request.GetSkipCheck() {
var maxLocalTS *pdpb.Timestamp
syncedDCs := make([]string, 0, len(allocatorLeaders))
for _, allocator := range allocatorLeaders {
// No longer leader, just skip here because
// the global allocator will check if all DCs are handled.
if !allocator.IsAllocatorLeader() {
continue
}
currentLocalTSO, err := allocator.GetCurrentTSO()
if err != nil {
return &pdpb.SyncMaxTSResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
if tsoutil.CompareTimestamp(currentLocalTSO, maxLocalTS) > 0 {
maxLocalTS = currentLocalTSO
}
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}

failpoint.Inject("mockLocalAllocatorLeaderChange", func() {
if !mockLocalAllocatorLeaderChangeFlag {
maxLocalTS = nil
request.MaxTs = nil
mockLocalAllocatorLeaderChangeFlag = true
}
})

if maxLocalTS == nil {
return &pdpb.SyncMaxTSResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN,
"local tso allocator leaders have changed during the sync, should retry"),
}, nil
}
if request.GetMaxTs() == nil {
return &pdpb.SyncMaxTSResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN,
"empty maxTS in the request, should retry"),
}, nil
}
// Found a bigger or equal maxLocalTS, return it directly.
cmpResult := tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs())
if cmpResult >= 0 {
// Found an equal maxLocalTS, plus 1 to logical part before returning it.
// For example, we have a Global TSO t1 and a Local TSO t2, they have the
// same physical and logical parts. After being differentiating with suffix,
// there will be (t1.logical << suffixNum + 0) < (t2.logical << suffixNum + N),
// where N is bigger than 0, which will cause a Global TSO fallback than the previous Local TSO.
if cmpResult == 0 {
maxLocalTS.Logical += 1
}
return &pdpb.SyncMaxTSResponse{
Header: Header(s.ClusterID()),
MaxLocalTs: maxLocalTS,
SyncedDcs: syncedDCs,
}, nil
}
}
syncedDCs := make([]string, 0, len(allocatorLeaders))
for _, allocator := range allocatorLeaders {
if !allocator.IsAllocatorLeader() {
continue
}
if err := allocator.WriteTSO(request.GetMaxTs()); err != nil {
return &pdpb.SyncMaxTSResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}
return &pdpb.SyncMaxTSResponse{
Header: Header(s.ClusterID()),
SyncedDcs: syncedDCs,
}, nil
}

// GetDCLocationInfo gets the dc-location info of the given dc-location from the primary's TSO allocator manager.
func GetDCLocationInfo(ctx context.Context, s GrpcServer, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) {
am := s.GetTSOAllocatorManager()
info, ok := am.GetDCLocationInfo(request.GetDcLocation())
if !ok {
am.ClusterDCLocationChecker()
return &pdpb.GetDCLocationInfoResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN,
fmt.Sprintf("dc-location %s is not found", request.GetDcLocation())),
}, nil
}
resp := &pdpb.GetDCLocationInfoResponse{
Header: Header(s.ClusterID()),
Suffix: info.Suffix,
}
// Because the number of suffix bits is changing dynamically according to the dc-location number,
// there is a corner case may cause the Local TSO is not unique while member changing.
// Example:
// t1: xxxxxxxxxxxxxxx1 | 11
// t2: xxxxxxxxxxxxxxx | 111
// So we will force the newly added Local TSO Allocator to have a Global TSO synchronization
// when it becomes the Local TSO Allocator leader.
// Please take a look at https://github.com/tikv/pd/issues/3260 for more details.
var err error
if resp.MaxTs, err = am.GetMaxLocalTSO(ctx); err != nil {
return &pdpb.GetDCLocationInfoResponse{
Header: WrapErrorToHeader(s.ClusterID(), pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
return resp, nil
}

// Header is a helper function to wrapper the response header for the given cluster id
func Header(clusterID uint64) *pdpb.ResponseHeader {
if clusterID == 0 {
Expand All @@ -139,12 +278,6 @@ func ErrorHeader(clusterID uint64, err *pdpb.Error) *pdpb.ResponseHeader {
}
}

type tsoRequest struct {
forwardedHost string
request *pdpb.TsoRequest
stream pdpb.PD_TsoServer
}

func dispatchTSORequest(ctx context.Context, s GrpcServer, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) {
tsoRequestChInterface, loaded := s.GetTSODispatcher().LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests))
if !loaded {
Expand Down
Loading

0 comments on commit 37f6c44

Please sign in to comment.