diff --git a/server/grpc_service.go b/server/grpc_service.go index f0939a45043..d9883e4453f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -49,7 +49,8 @@ import ( ) const ( - heartbeatSendTimeout = 5 * time.Second + heartbeatSendTimeout = 5 * time.Second + maxRetryTimesGetGlobalTSOFromTSOServer = 3 ) // gRPC errors @@ -1774,29 +1775,40 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest if !ok || forwardedHost == "" { return pdpb.Timestamp{}, ErrNotFoundTSOAddr } - forwardStream, err := s.getTSOForwardStream(forwardedHost) - if err != nil { - return pdpb.Timestamp{}, err - } - forwardStream.Send(&tsopb.TsoRequest{ + request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, KeyspaceId: utils.DefaultKeyspaceID, KeyspaceGroupId: utils.DefaultKeyspaceGroupID, }, Count: 1, - }) - ts, err := forwardStream.Recv() - if err != nil { - log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso", forwardedHost)) - if strings.Contains(err.Error(), codes.Unavailable.String()) { - s.tsoClientPool.Lock() - delete(s.tsoClientPool.clients, forwardedHost) - s.tsoClientPool.Unlock() + } + var ( + forwardStream tsopb.TSO_TsoClient + ts *tsopb.TsoResponse + err error + ) + for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ { + forwardStream, err = s.getTSOForwardStream(forwardedHost) + if err != nil { + return pdpb.Timestamp{}, err + } + forwardStream.Send(request) + ts, err = forwardStream.Recv() + if err != nil { + if strings.Contains(err.Error(), codes.Unavailable.String()) { + s.tsoClientPool.Lock() + delete(s.tsoClientPool.clients, forwardedHost) + s.tsoClientPool.Unlock() + continue + } + log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err } - return pdpb.Timestamp{}, err + return *ts.GetTimestamp(), nil } - return *ts.GetTimestamp(), nil + log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err } func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoClient, error) {