From b82194df089708a94f71779ec636ddf7c8a89137 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Sun, 23 Apr 2023 12:12:44 +0800 Subject: [PATCH] mcs: update client when meet transport is closing (#6341) * mcs: update client when meet transport is closing Signed-off-by: lhy1024 * address comments Signed-off-by: lhy1024 * add retry Signed-off-by: lhy1024 --------- Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/grpc_service.go | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 48ac9875415..d9883e4453f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -20,6 +20,7 @@ import ( "io" "path" "strconv" + "strings" "sync/atomic" "time" @@ -48,7 +49,8 @@ import ( ) const ( - heartbeatSendTimeout = 5 * time.Second + heartbeatSendTimeout = 5 * time.Second + maxRetryTimesGetGlobalTSOFromTSOServer = 3 ) // gRPC errors @@ -1773,24 +1775,40 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest if !ok || forwardedHost == "" { return pdpb.Timestamp{}, ErrNotFoundTSOAddr } - forwardStream, err := s.getTSOForwardStream(forwardedHost) - if err != nil { - return pdpb.Timestamp{}, err - } - forwardStream.Send(&tsopb.TsoRequest{ + request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, KeyspaceId: utils.DefaultKeyspaceID, KeyspaceGroupId: utils.DefaultKeyspaceGroupID, }, Count: 1, - }) - ts, err := forwardStream.Recv() - if err != nil { - log.Error("get global tso from tso server failed", zap.Error(err)) - return pdpb.Timestamp{}, err } - return *ts.GetTimestamp(), nil + var ( + forwardStream tsopb.TSO_TsoClient + ts *tsopb.TsoResponse + err error + ) + for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ { + forwardStream, err = s.getTSOForwardStream(forwardedHost) + if err != nil { + return pdpb.Timestamp{}, err + } + forwardStream.Send(request) + ts, err = forwardStream.Recv() + if err != nil { + if strings.Contains(err.Error(), codes.Unavailable.String()) { + s.tsoClientPool.Lock() + delete(s.tsoClientPool.clients, forwardedHost) + s.tsoClientPool.Unlock() + continue + } + log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err + } + return *ts.GetTimestamp(), nil + } + log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err } func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoClient, error) {