Skip to content

Commit

Permalink
mcs: update client when meet transport is closing (tikv#6341)
Browse files Browse the repository at this point in the history
* mcs: update client when meet transport is closing

Signed-off-by: lhy1024 <[email protected]>

* address comments

Signed-off-by: lhy1024 <[email protected]>

* add retry

Signed-off-by: lhy1024 <[email protected]>

---------

Signed-off-by: lhy1024 <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent 7205af4 commit b82194d
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"path"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -48,7 +49,8 @@ import (
)

const (
heartbeatSendTimeout = 5 * time.Second
heartbeatSendTimeout = 5 * time.Second
maxRetryTimesGetGlobalTSOFromTSOServer = 3
)

// gRPC errors
Expand Down Expand Up @@ -1773,24 +1775,40 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
forwardStream, err := s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(&tsopb.TsoRequest{
request := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: s.clusterID,
KeyspaceId: utils.DefaultKeyspaceID,
KeyspaceGroupId: utils.DefaultKeyspaceGroupID,
},
Count: 1,
})
ts, err := forwardStream.Recv()
if err != nil {
log.Error("get global tso from tso server failed", zap.Error(err))
return pdpb.Timestamp{}, err
}
return *ts.GetTimestamp(), nil
var (
forwardStream tsopb.TSO_TsoClient
ts *tsopb.TsoResponse
err error
)
for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ {
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(request)
ts, err = forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), codes.Unavailable.String()) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
s.tsoClientPool.Unlock()
continue
}
log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err
}
return *ts.GetTimestamp(), nil
}
log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err
}

func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoClient, error) {
Expand Down

0 comments on commit b82194d

Please sign in to comment.