Skip to content

Commit

Permalink
add retry
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Apr 19, 2023
1 parent c8820e5 commit aa4caf2
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ import (
)

const (
heartbeatSendTimeout = 5 * time.Second
heartbeatSendTimeout = 5 * time.Second
maxRetryTimesGetGlobalTSOFromTSOServer = 3
)

// gRPC errors
Expand Down Expand Up @@ -1774,29 +1775,40 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
forwardStream, err := s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(&tsopb.TsoRequest{
request := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: s.clusterID,
KeyspaceId: utils.DefaultKeyspaceID,
KeyspaceGroupId: utils.DefaultKeyspaceGroupID,
},
Count: 1,
})
ts, err := forwardStream.Recv()
if err != nil {
log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso", forwardedHost))
if strings.Contains(err.Error(), codes.Unavailable.String()) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
s.tsoClientPool.Unlock()
}
var (
forwardStream tsopb.TSO_TsoClient
ts *tsopb.TsoResponse
err error
)
for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ {
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(request)
ts, err = forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), codes.Unavailable.String()) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
s.tsoClientPool.Unlock()
continue
}
log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err
}
return pdpb.Timestamp{}, err
return *ts.GetTimestamp(), nil
}
return *ts.GetTimestamp(), nil
log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err
}

func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoClient, error) {
Expand Down

0 comments on commit aa4caf2

Please sign in to comment.