Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd (ticdc): add grpc keepalive params and add timeout for check pd version ctx. (#9106) #9117

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
This is an automated cherry-pick of #9106
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Jun 1, 2023
commit a095c39d6a39051db432f7becea1becc626ce10e
1 change: 1 addition & 0 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (c *Capture) reset(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
log.Info("reset session successfully", zap.Any("session", sess))

c.captureMu.Lock()
defer c.captureMu.Unlock()
Expand Down
30 changes: 30 additions & 0 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -124,6 +125,18 @@ func (s *Server) Run(ctx context.Context) error {
logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)

<<<<<<< HEAD:cdc/server.go
=======
log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
>>>>>>> e1826b37cb (etcd (ticdc): add grpc keepalive params and add timeout for check pd version ctx. (#9106)):cdc/server/server.go
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: s.pdEndpoints,
TLS: tlsConfig,
Expand All @@ -142,6 +155,10 @@ func (s *Server) Run(ctx context.Context) error {
},
MinConnectTimeout: 3 * time.Second,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
}),
},
})
if err != nil {
Expand Down Expand Up @@ -207,9 +224,13 @@ func (s *Server) startStatusHTTP(lis net.Listener) error {
return nil
}

<<<<<<< HEAD:cdc/server.go
func (s *Server) etcdHealthChecker(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()
=======
func (s *server) etcdHealthChecker(ctx context.Context) error {
>>>>>>> e1826b37cb (etcd (ticdc): add grpc keepalive params and add timeout for check pd version ctx. (#9106)):cdc/server/server.go
conf := config.GetGlobalServerConfig()

httpCli, err := httputil.NewClient(conf.Security)
Expand All @@ -222,6 +243,9 @@ func (s *Server) etcdHealthChecker(ctx context.Context) error {
metrics[pdEndpoint] = etcdHealthCheckDuration.WithLabelValues(pdEndpoint)
}

ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -240,6 +264,12 @@ func (s *Server) etcdHealthChecker(ctx context.Context) error {
}
cancel()
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = s.etcdClient.GetEtcdClient().Unwrap().MemberList(ctx)
cancel()
if err != nil {
log.Warn("etcd health check error, fail to list etcd members", zap.Error(err))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var (
)

// set to var instead of const for mocking the value to speedup test
var maxTries uint64 = 8
var maxTries uint64 = 12

// Client is a simple wrapper that adds retry to etcd RPC
type Client struct {
Expand Down
15 changes: 15 additions & 0 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func removeVAndHash(v string) string {
return strings.TrimPrefix(v, "v")
}

var checkClusterVersionRetryTimes = 10

// CheckClusterVersion check TiKV and PD version.
// need only one PD alive and match the cdc version.
func CheckClusterVersion(
Expand All @@ -85,7 +87,18 @@ func CheckClusterVersion(
}

for _, pdAddr := range pdAddrs {
<<<<<<< HEAD
err = CheckPDVersion(ctx, pdAddr, credential)
=======
// check pd version with retry, if the pdAddr is a service or lb address
// the http client may connect to an unhealthy PD that returns 503
err = retry.Do(ctx, func() error {
return checkPDVersion(ctx, pdAddr, credential)
}, retry.WithBackoffBaseDelay(time.Millisecond.Milliseconds()*10),
retry.WithBackoffMaxDelay(time.Second.Milliseconds()),
retry.WithMaxTries(uint64(checkClusterVersionRetryTimes)),
retry.WithIsRetryableErr(cerror.IsRetryableError))
>>>>>>> e1826b37cb (etcd (ticdc): add grpc keepalive params and add timeout for check pd version ctx. (#9106))
if err == nil {
break
}
Expand All @@ -106,6 +119,8 @@ func CheckPDVersion(ctx context.Context, pdAddr string, credential *security.Cre
return err
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
resp, err := httpClient.Get(ctx, fmt.Sprintf("%s/pd/api/v1/version", pdAddr))
if err != nil {
return cerror.ErrCheckClusterVersionFromPD.GenWithStackByArgs(err)
Expand Down
39 changes: 39 additions & 0 deletions pkg/version/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,45 @@ func TestCheckClusterVersion(t *testing.T) {
err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false)
require.Regexp(t, ".*400 Bad Request.*", err)
}
<<<<<<< HEAD
=======

// check retry success
{
retryTimes := 0
mock.getStatusCode = func() int {
if retryTimes < 4 {
retryTimes++
return http.StatusServiceUnavailable
}
return http.StatusOK
}

mock.getPDVersion = func() string {
return "7.9.9"
}
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: "v7.9.9"}}
}
err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true)
require.Nil(t, err)
}

// check retry failed
{
retryTimes := 0
mock.getStatusCode = func() int {
if retryTimes < checkClusterVersionRetryTimes {
retryTimes++
return http.StatusBadRequest
}
return http.StatusOK
}

err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false)
require.Regexp(t, ".*400 Bad Request.*", err)
}
>>>>>>> e1826b37cb (etcd (ticdc): add grpc keepalive params and add timeout for check pd version ctx. (#9106))
}

func TestCompareVersion(t *testing.T) {
Expand Down