From 7867e25cda7db3a0d3ad54ec2cd9bd5800bc4710 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 24 Jan 2024 18:53:32 +0800 Subject: [PATCH 1/7] wip Signed-off-by: Ping Yu --- internal/client/client_batch.go | 7 +++++++ internal/locate/region_request.go | 8 ++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 49b8fe04ed..4f0f0ec174 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -795,6 +795,9 @@ func sendBatchRequest( logutil.Logger(ctx).Debug("send request is cancelled", zap.String("to", addr), zap.String("cause", ctx.Err().Error())) return nil, errors.WithStack(ctx.Err()) + case <-batchConn.closed: + logutil.Logger(ctx).Debug("send request is cancelled (batchConn is closed)", zap.String("to", addr)) + return nil, errors.New("batchConn is closed") case <-timer.C: return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") } @@ -812,6 +815,10 @@ func sendBatchRequest( logutil.Logger(ctx).Debug("wait response is cancelled", zap.String("to", addr), zap.String("cause", ctx.Err().Error())) return nil, errors.WithStack(ctx.Err()) + case <-batchConn.closed: + atomic.StoreInt32(&entry.canceled, 1) + logutil.Logger(ctx).Debug("wait response is cancelled (batchConn is closed)", zap.String("to", addr)) + return nil, errors.New("batchConn is closed") case <-timer.C: atomic.StoreInt32(&entry.canceled, 1) reason := fmt.Sprintf("wait recvLoop timeout,timeout:%s, wait_duration:%s:", timeout, waitDuration) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index dbd9836579..947a5cceb5 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1781,7 +1781,7 @@ func (s *RegionRequestSender) sendReqToRegion( return nil, false, err } } - if e := s.onSendFail(bo, rpcCtx, req, err); e != nil { + if e := s.onSendFail(bo, rpcCtx, sendToAddr, req, err); e != nil { return nil, false, err } return nil, true, nil @@ -1811,7 +1811,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) { logutil.BgLogger().Warn("release store token failed, count equals to 0") } -func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error { +func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, addr string, req *tikvrpc.Request, err error) error { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1836,8 +1836,8 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r // For the case of canceled by keepalive, we need to re-establish the connection, otherwise following requests will always fail. // Canceled by gRPC remote may happen when tikv is killed and exiting. // Close the connection, backoff, and retry. - logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal", zap.Error(err)) - s.client.CloseAddr(ctx.Addr) + logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal", zap.String("addr", addr), zap.Error(err)) + s.client.CloseAddr(addr) } } From 19b74cb4b5d9199a6abe99ecd870d79cbd5ff6fc Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 24 Jan 2024 21:34:37 +0800 Subject: [PATCH 2/7] CloseAddr with ver Signed-off-by: Ping Yu --- .golangci.yml | 11 ++++ integration_tests/async_commit_test.go | 2 +- integration_tests/pd_api_test.go | 4 +- internal/client/client.go | 65 ++++++++++++++++++---- internal/client/client_batch.go | 6 +- internal/client/client_interceptor_test.go | 2 +- internal/client/client_test.go | 12 +++- internal/locate/region_request.go | 8 ++- internal/locate/region_request_test.go | 7 ++- internal/mockstore/mocktikv/rpc.go | 2 +- tikv/kv_test.go | 4 +- 11 files changed, 97 insertions(+), 26 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2929ea46fa..9d2918bcbf 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -19,3 +19,14 @@ linters: - typecheck - unconvert - unused + +linters-settings: + depguard: + rules: + main: + list-mode: lax # everything is allowed unless it is denied. + deny: + - pkg: "log" + desc: logging is allowed only by pingcap/log + - pkg: "github.com/juju/errors" + desc: error handling is allowed only by pingcap/errors diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 5817fa1a1c..14f7e8e263 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -76,7 +76,7 @@ type unistoreClientWrapper struct { *unistore.RPCClient } -func (c *unistoreClientWrapper) CloseAddr(addr string) error { +func (c *unistoreClientWrapper) CloseAddr(addr string, ver uint64) error { return nil } diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index a990c9cad7..1fe9ec5007 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -95,8 +95,8 @@ func (c *storeSafeTsMockClient) Close() error { return c.Client.Close() } -func (c *storeSafeTsMockClient) CloseAddr(addr string) error { - return c.Client.CloseAddr(addr) +func (c *storeSafeTsMockClient) CloseAddr(addr string, ver uint64) error { + return c.Client.CloseAddr(addr, ver) } func (s *apiTestSuite) TestGetStoresMinResolvedTS() { diff --git a/internal/client/client.go b/internal/client/client.go index 05d73bfbf5..77f61b97b5 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -104,14 +104,42 @@ type Client interface { // Close should release all data. Close() error // CloseAddr closes gRPC connections to the address. It will reconnect the next time it's used. - CloseAddr(addr string) error + // `ver` is used to avoid unnecessary closing after the address has been reconnected. + // Pass `math.Uint64` if you want to close forcedly. + CloseAddr(addr string, ver uint64) error // SendRequest sends Request. SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) } +type ErrConn struct { + Err error + Addr string + Ver uint64 +} + +func (e *ErrConn) Error() string { + return fmt.Sprintf("[%s](%d) %s", e.Addr, e.Ver, e.Err.Error()) +} + +func (e *ErrConn) Unwrap() error { + return e.Err +} + +func WrapErrConn(err error, conn *connArray) error { + if err == nil { + return nil + } + return &ErrConn{ + Err: err, + Addr: conn.target, + Ver: conn.ver, + } +} + type connArray struct { // The target host. target string + ver uint64 index uint32 v []*monitoredConn @@ -125,9 +153,10 @@ type connArray struct { monitor *connMonitor } -func newConnArray(maxSize uint, addr string, security config.Security, +func newConnArray(maxSize uint, addr string, ver uint64, security config.Security, idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, opts []grpc.DialOption) (*connArray, error) { a := &connArray{ + ver: ver, index: 0, v: make([]*monitoredConn, maxSize), streamTimeout: make(chan *tikvrpc.Lease, 1024), @@ -390,6 +419,7 @@ type RPCClient struct { sync.RWMutex conns map[string]*connArray + vers map[string]uint64 option *option idleNotify uint32 @@ -405,6 +435,7 @@ type RPCClient struct { func NewRPCClient(opts ...Opt) *RPCClient { cli := &RPCClient{ conns: make(map[string]*connArray), + vers: make(map[string]uint64), option: &option{ dialTimeout: dialTimeout, }, @@ -452,9 +483,11 @@ func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func( for _, opt := range opts { opt(&client) } + ver := c.vers[addr] + 1 array, err = newConnArray( client.GrpcConnectionCount, addr, + ver, c.option.security, &c.idleNotify, enableBatch, @@ -466,6 +499,7 @@ func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func( return nil, err } c.conns[addr] = array + c.vers[addr] = ver } return array, nil } @@ -603,6 +637,10 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R return nil, err } + wrapErrConn := func(resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) { + return resp, WrapErrConn(err, connArray) + } + start := time.Now() staleRead := req.GetStaleRead() defer func() { @@ -625,7 +663,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch { if batchReq := req.ToBatchCommandsRequest(); batchReq != nil { defer trace.StartRegion(ctx, req.Type.String()).End() - return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout) + return wrapErrConn(sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout)) } } @@ -639,7 +677,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R client := debugpb.NewDebugClient(clientConn) ctx1, cancel := context.WithTimeout(ctx, timeout) defer cancel() - return tikvrpc.CallDebugRPC(ctx1, client, req) + return wrapErrConn(tikvrpc.CallDebugRPC(ctx1, client, req)) } client := tikvpb.NewTikvClient(clientConn) @@ -650,16 +688,16 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R } switch req.Type { case tikvrpc.CmdBatchCop: - return c.getBatchCopStreamResponse(ctx, client, req, timeout, connArray) + return wrapErrConn(c.getBatchCopStreamResponse(ctx, client, req, timeout, connArray)) case tikvrpc.CmdCopStream: - return c.getCopStreamResponse(ctx, client, req, timeout, connArray) + return wrapErrConn(c.getCopStreamResponse(ctx, client, req, timeout, connArray)) case tikvrpc.CmdMPPConn: - return c.getMPPStreamResponse(ctx, client, req, timeout, connArray) + return wrapErrConn(c.getMPPStreamResponse(ctx, client, req, timeout, connArray)) } // Or else it's a unary call. ctx1, cancel := context.WithTimeout(ctx, timeout) defer cancel() - return tikvrpc.CallRPC(ctx1, client, req) + return wrapErrConn(tikvrpc.CallRPC(ctx1, client, req)) } // SendRequest sends a Request to server and receives Response. @@ -792,12 +830,17 @@ func (c *RPCClient) Close() error { } // CloseAddr closes gRPC connections to the address. -func (c *RPCClient) CloseAddr(addr string) error { +func (c *RPCClient) CloseAddr(addr string, ver uint64) error { c.Lock() conn, ok := c.conns[addr] if ok { - delete(c.conns, addr) - logutil.BgLogger().Debug("close connection", zap.String("target", addr)) + if conn.ver <= ver { + delete(c.conns, addr) + logutil.BgLogger().Debug("close connection", zap.String("target", addr), zap.Uint64("ver", ver), zap.Uint64("conn.ver", conn.ver)) + } else { + logutil.BgLogger().Debug("ignore close connection", zap.String("target", addr), zap.Uint64("ver", ver), zap.Uint64("conn.ver", conn.ver)) + conn = nil + } } c.Unlock() diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 4f0f0ec174..9b7dabe0d4 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -830,16 +830,18 @@ func (c *RPCClient) recycleIdleConnArray() { start := time.Now() var addrs []string + var vers []uint64 c.RLock() for _, conn := range c.conns { if conn.batchConn != nil && conn.isIdle() { addrs = append(addrs, conn.target) + vers = append(vers, conn.ver) } } c.RUnlock() - for _, addr := range addrs { - c.CloseAddr(addr) + for i, addr := range addrs { + c.CloseAddr(addr, vers[i]) } metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds()) diff --git a/internal/client/client_interceptor_test.go b/internal/client/client_interceptor_test.go index 88fc0af7e8..4250b372c7 100644 --- a/internal/client/client_interceptor_test.go +++ b/internal/client/client_interceptor_test.go @@ -35,7 +35,7 @@ func (c emptyClient) Close() error { return nil } -func (c emptyClient) CloseAddr(addr string) error { +func (c emptyClient) CloseAddr(addr string, ver uint64) error { return nil } diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 2934c1241c..ec77847046 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -79,12 +79,18 @@ func TestConn(t *testing.T) { assert.Nil(t, err) assert.False(t, conn2.Get() == conn1.Get()) - assert.Nil(t, client.CloseAddr(addr)) + ver := conn2.ver + assert.Nil(t, client.CloseAddr(addr, ver-1)) _, ok := client.conns[addr] + assert.True(t, ok) + assert.Nil(t, client.CloseAddr(addr, ver)) + _, ok = client.conns[addr] assert.False(t, ok) + conn3, err := client.getConnArray(addr, true) assert.Nil(t, err) assert.NotNil(t, conn3) + assert.Equal(t, ver+1, conn3.ver) client.Close() conn4, err := client.getConnArray(addr, true) @@ -99,7 +105,7 @@ func TestGetConnAfterClose(t *testing.T) { addr := "127.0.0.1:6379" connArray, err := client.getConnArray(addr, true) assert.Nil(t, err) - assert.Nil(t, client.CloseAddr(addr)) + assert.Nil(t, client.CloseAddr(addr, connArray.ver)) conn := connArray.Get() state := conn.GetState() assert.True(t, state == connectivity.Shutdown) @@ -149,7 +155,7 @@ func (c *chanClient) Close() error { return nil } -func (c *chanClient) CloseAddr(addr string) error { +func (c *chanClient) CloseAddr(addr string, ver uint64) error { return nil } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 947a5cceb5..8bad6c9e95 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1837,7 +1837,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, a // Canceled by gRPC remote may happen when tikv is killed and exiting. // Close the connection, backoff, and retry. logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal", zap.String("addr", addr), zap.Error(err)) - s.client.CloseAddr(addr) + var errConn client.ErrConn + if errors.As(err, &errConn) { + logutil.Logger(bo.GetCtx()).Debug("close connection", zap.Error(&errConn)) + s.client.CloseAddr(errConn.Addr, errConn.Ver) + } } } @@ -2195,7 +2199,7 @@ func (s *RegionRequestSender) onRegionError( s.regionCache.InvalidateCachedRegion(ctx.Region) // It's possible the address of store is not changed but the DNS resolves to a different address in k8s environment, // so we always reconnect in this case. - s.client.CloseAddr(ctx.Addr) + s.client.CloseAddr(ctx.Addr, math.MaxUint64) return false, nil } diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index c16626accc..fb1913e240 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -37,6 +37,7 @@ package locate import ( "context" "fmt" + "math" "math/rand" "net" "sync" @@ -99,14 +100,16 @@ func (s *testRegionRequestToSingleStoreSuite) TearDownTest() { type fnClient struct { fn func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) closedAddr string + closedVer uint64 } func (f *fnClient) Close() error { return nil } -func (f *fnClient) CloseAddr(addr string) error { +func (f *fnClient) CloseAddr(addr string, ver uint64) error { f.closedAddr = addr + f.closedVer = ver return nil } @@ -664,6 +667,8 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) s.Equal(target, client.closedAddr) + var expected uint64 = math.MaxUint64 + s.Equal(expected, client.closedVer) } func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() { diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index 09e31aa99f..c3da01e14e 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -1092,6 +1092,6 @@ func (c *RPCClient) Close() error { } // CloseAddr does nothing. -func (c *RPCClient) CloseAddr(addr string) error { +func (c *RPCClient) CloseAddr(addr string, ver uint64) error { return nil } diff --git a/tikv/kv_test.go b/tikv/kv_test.go index 9f9af85006..ae07b1d0dd 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -101,8 +101,8 @@ func (c *storeSafeTsMockClient) Close() error { return c.Client.Close() } -func (c *storeSafeTsMockClient) CloseAddr(addr string) error { - return c.Client.CloseAddr(addr) +func (c *storeSafeTsMockClient) CloseAddr(addr string, ver uint64) error { + return c.Client.CloseAddr(addr, ver) } func (s *testKVSuite) TestMinSafeTs() { From 10e939426bb8bb4ba5798c9fb2cdff5a50285a50 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 25 Jan 2024 15:23:00 +0800 Subject: [PATCH 3/7] fix ErrConn Signed-off-by: Ping Yu --- internal/client/client_test.go | 31 +++++++++++++++++++++++++++++++ internal/locate/region_request.go | 4 ++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index ec77847046..b77c8ce0c7 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -729,3 +729,34 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { require.NoError(t, err) } } + +func TestErrConn(t *testing.T) { + e := errors.New("conn error") + err1 := &ErrConn{Err: e, Addr: "addr", Ver: 10} + err2 := &ErrConn{Err: e, Addr: "addr", Ver: 10} + + e3 := errors.New("conn error 3") + err3 := &ErrConn{Err: e3} + + err4 := errors.New("not ErrConn") + + assert.True(t, errors.Is(err1, err1)) + assert.True(t, errors.Is(fmt.Errorf("%w", err1), err1)) + assert.False(t, errors.Is(fmt.Errorf("%w", err2), err1)) // err2 != err1 + assert.False(t, errors.Is(fmt.Errorf("%w", err4), err1)) + + var errConn *ErrConn + assert.True(t, errors.As(err1, &errConn)) + assert.Equal(t, "addr", errConn.Addr) + assert.EqualValues(t, 10, errConn.Ver) + assert.EqualError(t, e, errConn.Err.Error()) + + assert.True(t, errors.As(err3, &errConn)) + assert.EqualError(t, e3, errConn.Err.Error()) + + assert.False(t, errors.As(err4, &errConn)) + + errMsg := errors.New("unknown") + assert.True(t, errors.As(err1, &errMsg)) + assert.EqualError(t, err1, errMsg.Error()) +} diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 8bad6c9e95..e64cc25420 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1837,9 +1837,9 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, a // Canceled by gRPC remote may happen when tikv is killed and exiting. // Close the connection, backoff, and retry. logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal", zap.String("addr", addr), zap.Error(err)) - var errConn client.ErrConn + var errConn *client.ErrConn if errors.As(err, &errConn) { - logutil.Logger(bo.GetCtx()).Debug("close connection", zap.Error(&errConn)) + logutil.Logger(bo.GetCtx()).Debug("close connection", zap.Error(errConn)) s.client.CloseAddr(errConn.Addr, errConn.Ver) } } From c902466140b4662370273ef9b913f80660c87c8c Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 25 Jan 2024 16:02:23 +0800 Subject: [PATCH 4/7] fix ut Signed-off-by: Ping Yu --- internal/client/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index b77c8ce0c7..134cad9c52 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -141,7 +141,7 @@ func TestSendWhenReconnect(t *testing.T) { req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{}) _, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second) - assert.True(t, err.Error() == "no available connections") + assert.EqualError(t, err, fmt.Sprintf("[%s](%d) no available connections", addr, 1)) server.Stop() } From 3852b54df308676564679842a7074d6a7a952dac Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 25 Jan 2024 19:08:24 +0800 Subject: [PATCH 5/7] polish Signed-off-by: Ping Yu --- integration_tests/async_commit_test.go | 2 +- integration_tests/pd_api_test.go | 4 +-- internal/client/client.go | 16 ++++++++--- internal/client/client_batch.go | 2 +- internal/client/client_interceptor_test.go | 2 +- internal/client/client_test.go | 8 +++--- internal/locate/region_request.go | 24 +++++++++++++--- internal/locate/region_request_test.go | 32 +++++++++++++++++++++- internal/mockstore/mocktikv/rpc.go | 2 +- tikv/kv_test.go | 4 +-- 10 files changed, 75 insertions(+), 21 deletions(-) diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 14f7e8e263..5817fa1a1c 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -76,7 +76,7 @@ type unistoreClientWrapper struct { *unistore.RPCClient } -func (c *unistoreClientWrapper) CloseAddr(addr string, ver uint64) error { +func (c *unistoreClientWrapper) CloseAddr(addr string) error { return nil } diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 1fe9ec5007..a76ecc5da5 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -95,8 +95,8 @@ func (c *storeSafeTsMockClient) Close() error { return c.Client.Close() } -func (c *storeSafeTsMockClient) CloseAddr(addr string, ver uint64) error { - return c.Client.CloseAddr(addr, ver) +func (c *storeSafeTsMockClient) CloseAddrVer(addr string) error { + return c.Client.CloseAddr(addr) } func (s *apiTestSuite) TestGetStoresMinResolvedTS() { diff --git a/internal/client/client.go b/internal/client/client.go index 77f61b97b5..f19bc678aa 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -104,13 +104,17 @@ type Client interface { // Close should release all data. Close() error // CloseAddr closes gRPC connections to the address. It will reconnect the next time it's used. - // `ver` is used to avoid unnecessary closing after the address has been reconnected. - // Pass `math.Uint64` if you want to close forcedly. - CloseAddr(addr string, ver uint64) error + CloseAddr(addr string) error // SendRequest sends Request. SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) } +type ClientExt interface { + // `ver` is used to avoid unnecessary closing after the address has been reconnected. + // Pass `math.Uint64` if you want to close forcedly. + CloseAddrVer(addr string, ver uint64) error +} + type ErrConn struct { Err error Addr string @@ -830,7 +834,11 @@ func (c *RPCClient) Close() error { } // CloseAddr closes gRPC connections to the address. -func (c *RPCClient) CloseAddr(addr string, ver uint64) error { +func (c *RPCClient) CloseAddr(addr string) error { + return c.CloseAddrVer(addr, math.MaxUint64) +} + +func (c *RPCClient) CloseAddrVer(addr string, ver uint64) error { c.Lock() conn, ok := c.conns[addr] if ok { diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 9b7dabe0d4..6df1165f89 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -841,7 +841,7 @@ func (c *RPCClient) recycleIdleConnArray() { c.RUnlock() for i, addr := range addrs { - c.CloseAddr(addr, vers[i]) + c.CloseAddrVer(addr, vers[i]) } metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds()) diff --git a/internal/client/client_interceptor_test.go b/internal/client/client_interceptor_test.go index 4250b372c7..88fc0af7e8 100644 --- a/internal/client/client_interceptor_test.go +++ b/internal/client/client_interceptor_test.go @@ -35,7 +35,7 @@ func (c emptyClient) Close() error { return nil } -func (c emptyClient) CloseAddr(addr string, ver uint64) error { +func (c emptyClient) CloseAddr(addr string) error { return nil } diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 134cad9c52..ec204b84a1 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -80,10 +80,10 @@ func TestConn(t *testing.T) { assert.False(t, conn2.Get() == conn1.Get()) ver := conn2.ver - assert.Nil(t, client.CloseAddr(addr, ver-1)) + assert.Nil(t, client.CloseAddrVer(addr, ver-1)) _, ok := client.conns[addr] assert.True(t, ok) - assert.Nil(t, client.CloseAddr(addr, ver)) + assert.Nil(t, client.CloseAddrVer(addr, ver)) _, ok = client.conns[addr] assert.False(t, ok) @@ -105,7 +105,7 @@ func TestGetConnAfterClose(t *testing.T) { addr := "127.0.0.1:6379" connArray, err := client.getConnArray(addr, true) assert.Nil(t, err) - assert.Nil(t, client.CloseAddr(addr, connArray.ver)) + assert.Nil(t, client.CloseAddrVer(addr, connArray.ver)) conn := connArray.Get() state := conn.GetState() assert.True(t, state == connectivity.Shutdown) @@ -155,7 +155,7 @@ func (c *chanClient) Close() error { return nil } -func (c *chanClient) CloseAddr(addr string, ver uint64) error { +func (c *chanClient) CloseAddr(addr string) error { return nil } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index e64cc25420..6d25adf85c 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -105,6 +105,7 @@ type RegionRequestSender struct { regionCache *RegionCache apiVersion kvrpcpb.APIVersion client client.Client + clientExt client.ClientExt storeAddr string rpcError error replicaSelector *replicaSelector @@ -197,11 +198,21 @@ func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, } // NewRegionRequestSender creates a new sender. -func NewRegionRequestSender(regionCache *RegionCache, client client.Client) *RegionRequestSender { +func NewRegionRequestSender(regionCache *RegionCache, cli client.Client) *RegionRequestSender { + // check whether client implements ClientExt interface + // var clientExt client.ClientExt + // if _, ok := client.(client.ClientExt); ok { + // clientExt = client + // } + + var i interface{} = cli + cliExt, _ := i.(client.ClientExt) + return &RegionRequestSender{ regionCache: regionCache, apiVersion: regionCache.codec.GetAPIVersion(), - client: client, + client: cli, + clientExt: cliExt, } } @@ -1840,7 +1851,12 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, a var errConn *client.ErrConn if errors.As(err, &errConn) { logutil.Logger(bo.GetCtx()).Debug("close connection", zap.Error(errConn)) - s.client.CloseAddr(errConn.Addr, errConn.Ver) + + if s.clientExt != nil { + s.clientExt.CloseAddrVer(errConn.Addr, errConn.Ver) + } else { + s.client.CloseAddr(errConn.Addr) + } } } } @@ -2199,7 +2215,7 @@ func (s *RegionRequestSender) onRegionError( s.regionCache.InvalidateCachedRegion(ctx.Region) // It's possible the address of store is not changed but the DNS resolves to a different address in k8s environment, // so we always reconnect in this case. - s.client.CloseAddr(ctx.Addr, math.MaxUint64) + s.client.CloseAddr(ctx.Addr) return false, nil } diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index fb1913e240..3f0886b1e3 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -107,7 +107,11 @@ func (f *fnClient) Close() error { return nil } -func (f *fnClient) CloseAddr(addr string, ver uint64) error { +func (f *fnClient) CloseAddr(addr string) error { + return f.CloseAddrVer(addr, math.MaxUint64) +} + +func (f *fnClient) CloseAddrVer(addr string, ver uint64) error { f.closedAddr = addr f.closedVer = ver return nil @@ -829,3 +833,29 @@ func (s *testRegionRequestToSingleStoreSuite) TestCountReplicaNumber() { s.Equal(4, s.regionRequestSender.countReplicaNumber(peers)) // Only count 1 tiflash replica for tiflash write-nodes. } } + +type emptyClient struct{} + +func (c emptyClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + return nil, nil +} + +func (c emptyClient) Close() error { + return nil +} + +func (c emptyClient) CloseAddr(addr string) error { + return nil +} + +func (s *testRegionRequestToSingleStoreSuite) TestClientExt() { + var cli client.Client = client.NewRPCClient() + sender := NewRegionRequestSender(s.cache, cli) + s.NotNil(sender.client) + s.NotNil(sender.clientExt) + + cli = &emptyClient{} + sender = NewRegionRequestSender(s.cache, cli) + s.NotNil(sender.client) + s.Nil(sender.clientExt) +} diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index c3da01e14e..09e31aa99f 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -1092,6 +1092,6 @@ func (c *RPCClient) Close() error { } // CloseAddr does nothing. -func (c *RPCClient) CloseAddr(addr string, ver uint64) error { +func (c *RPCClient) CloseAddr(addr string) error { return nil } diff --git a/tikv/kv_test.go b/tikv/kv_test.go index ae07b1d0dd..9f9af85006 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -101,8 +101,8 @@ func (c *storeSafeTsMockClient) Close() error { return c.Client.Close() } -func (c *storeSafeTsMockClient) CloseAddr(addr string, ver uint64) error { - return c.Client.CloseAddr(addr, ver) +func (c *storeSafeTsMockClient) CloseAddr(addr string) error { + return c.Client.CloseAddr(addr) } func (s *testKVSuite) TestMinSafeTs() { From b3641a2ece51134381420018fb87b3c2b28e8900 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 25 Jan 2024 20:25:53 +0800 Subject: [PATCH 6/7] polish Signed-off-by: Ping Yu --- .golangci.yml | 11 --------- integration_tests/pd_api_test.go | 2 +- internal/client/client.go | 10 +++++--- internal/client/client_batch.go | 7 ------ internal/client/client_test.go | 10 ++++---- internal/locate/region_request.go | 34 +++++++++++--------------- internal/locate/region_request_test.go | 19 ++++---------- 7 files changed, 32 insertions(+), 61 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 9d2918bcbf..2929ea46fa 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -19,14 +19,3 @@ linters: - typecheck - unconvert - unused - -linters-settings: - depguard: - rules: - main: - list-mode: lax # everything is allowed unless it is denied. - deny: - - pkg: "log" - desc: logging is allowed only by pingcap/log - - pkg: "github.com/juju/errors" - desc: error handling is allowed only by pingcap/errors diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index a76ecc5da5..a990c9cad7 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -95,7 +95,7 @@ func (c *storeSafeTsMockClient) Close() error { return c.Client.Close() } -func (c *storeSafeTsMockClient) CloseAddrVer(addr string) error { +func (c *storeSafeTsMockClient) CloseAddr(addr string) error { return c.Client.CloseAddr(addr) } diff --git a/internal/client/client.go b/internal/client/client.go index f19bc678aa..1b9f41fd26 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -109,12 +109,15 @@ type Client interface { SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) } +// ClientExt is a client has extended interfaces. type ClientExt interface { - // `ver` is used to avoid unnecessary closing after the address has been reconnected. - // Pass `math.Uint64` if you want to close forcedly. + // CloseAddrVer closes gRPC connections to the address with additional `ver` parameter. + // Each new connection will have an incremented `ver` value, and attempts to close a previous `ver` will be ignored. + // Passing `math.MaxUint64` as the `ver` parameter will forcefully close all connections to the address. CloseAddrVer(addr string, ver uint64) error } +// ErrConn wraps error with target address and version of the connection. type ErrConn struct { Err error Addr string @@ -143,7 +146,8 @@ func WrapErrConn(err error, conn *connArray) error { type connArray struct { // The target host. target string - ver uint64 + // version of the connection array, increase by 1 when reconnect. + ver uint64 index uint32 v []*monitoredConn diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 6df1165f89..44b3a3fd68 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -795,9 +795,6 @@ func sendBatchRequest( logutil.Logger(ctx).Debug("send request is cancelled", zap.String("to", addr), zap.String("cause", ctx.Err().Error())) return nil, errors.WithStack(ctx.Err()) - case <-batchConn.closed: - logutil.Logger(ctx).Debug("send request is cancelled (batchConn is closed)", zap.String("to", addr)) - return nil, errors.New("batchConn is closed") case <-timer.C: return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") } @@ -815,10 +812,6 @@ func sendBatchRequest( logutil.Logger(ctx).Debug("wait response is cancelled", zap.String("to", addr), zap.String("cause", ctx.Err().Error())) return nil, errors.WithStack(ctx.Err()) - case <-batchConn.closed: - atomic.StoreInt32(&entry.canceled, 1) - logutil.Logger(ctx).Debug("wait response is cancelled (batchConn is closed)", zap.String("to", addr)) - return nil, errors.New("batchConn is closed") case <-timer.C: atomic.StoreInt32(&entry.canceled, 1) reason := fmt.Sprintf("wait recvLoop timeout,timeout:%s, wait_duration:%s:", timeout, waitDuration) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index ec204b84a1..c44d725de6 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -105,7 +105,7 @@ func TestGetConnAfterClose(t *testing.T) { addr := "127.0.0.1:6379" connArray, err := client.getConnArray(addr, true) assert.Nil(t, err) - assert.Nil(t, client.CloseAddrVer(addr, connArray.ver)) + assert.Nil(t, client.CloseAddr(addr)) conn := connArray.Get() state := conn.GetState() assert.True(t, state == connectivity.Shutdown) @@ -732,8 +732,8 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { func TestErrConn(t *testing.T) { e := errors.New("conn error") - err1 := &ErrConn{Err: e, Addr: "addr", Ver: 10} - err2 := &ErrConn{Err: e, Addr: "addr", Ver: 10} + err1 := &ErrConn{Err: e, Addr: "127.0.0.1", Ver: 10} + err2 := &ErrConn{Err: e, Addr: "127.0.0.1", Ver: 10} e3 := errors.New("conn error 3") err3 := &ErrConn{Err: e3} @@ -747,9 +747,9 @@ func TestErrConn(t *testing.T) { var errConn *ErrConn assert.True(t, errors.As(err1, &errConn)) - assert.Equal(t, "addr", errConn.Addr) + assert.Equal(t, "127.0.0.1", errConn.Addr) assert.EqualValues(t, 10, errConn.Ver) - assert.EqualError(t, e, errConn.Err.Error()) + assert.EqualError(t, errConn.Err, "conn error") assert.True(t, errors.As(err3, &errConn)) assert.EqualError(t, e3, errConn.Err.Error()) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 6d25adf85c..428100363c 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -105,7 +105,6 @@ type RegionRequestSender struct { regionCache *RegionCache apiVersion kvrpcpb.APIVersion client client.Client - clientExt client.ClientExt storeAddr string rpcError error replicaSelector *replicaSelector @@ -198,21 +197,11 @@ func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, } // NewRegionRequestSender creates a new sender. -func NewRegionRequestSender(regionCache *RegionCache, cli client.Client) *RegionRequestSender { - // check whether client implements ClientExt interface - // var clientExt client.ClientExt - // if _, ok := client.(client.ClientExt); ok { - // clientExt = client - // } - - var i interface{} = cli - cliExt, _ := i.(client.ClientExt) - +func NewRegionRequestSender(regionCache *RegionCache, client client.Client) *RegionRequestSender { return &RegionRequestSender{ regionCache: regionCache, apiVersion: regionCache.codec.GetAPIVersion(), - client: cli, - clientExt: cliExt, + client: client, } } @@ -226,6 +215,13 @@ func (s *RegionRequestSender) GetClient() client.Client { return s.client } +// getClientExt returns the RPC client with extension functions. +// Don't use in critical path. +func (s *RegionRequestSender) getClientExt() client.ClientExt { + ext, _ := s.client.(client.ClientExt) + return ext +} + // SetStoreAddr specifies the dest store address. func (s *RegionRequestSender) SetStoreAddr(addr string) { s.storeAddr = addr @@ -1792,7 +1788,7 @@ func (s *RegionRequestSender) sendReqToRegion( return nil, false, err } } - if e := s.onSendFail(bo, rpcCtx, sendToAddr, req, err); e != nil { + if e := s.onSendFail(bo, rpcCtx, req, err); e != nil { return nil, false, err } return nil, true, nil @@ -1822,7 +1818,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) { logutil.BgLogger().Warn("release store token failed, count equals to 0") } -func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, addr string, req *tikvrpc.Request, err error) error { +func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1847,13 +1843,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, a // For the case of canceled by keepalive, we need to re-establish the connection, otherwise following requests will always fail. // Canceled by gRPC remote may happen when tikv is killed and exiting. // Close the connection, backoff, and retry. - logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal", zap.String("addr", addr), zap.Error(err)) + logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal", zap.Error(err)) var errConn *client.ErrConn if errors.As(err, &errConn) { - logutil.Logger(bo.GetCtx()).Debug("close connection", zap.Error(errConn)) - - if s.clientExt != nil { - s.clientExt.CloseAddrVer(errConn.Addr, errConn.Ver) + if ext := s.getClientExt(); ext != nil { + ext.CloseAddrVer(errConn.Addr, errConn.Ver) } else { s.client.CloseAddr(errConn.Addr) } diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 3f0886b1e3..856371524b 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -834,28 +834,19 @@ func (s *testRegionRequestToSingleStoreSuite) TestCountReplicaNumber() { } } -type emptyClient struct{} - -func (c emptyClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { - return nil, nil -} - -func (c emptyClient) Close() error { - return nil -} - -func (c emptyClient) CloseAddr(addr string) error { - return nil +type emptyClient struct { + client.Client } func (s *testRegionRequestToSingleStoreSuite) TestClientExt() { var cli client.Client = client.NewRPCClient() sender := NewRegionRequestSender(s.cache, cli) s.NotNil(sender.client) - s.NotNil(sender.clientExt) + s.NotNil(sender.getClientExt()) + cli.Close() cli = &emptyClient{} sender = NewRegionRequestSender(s.cache, cli) s.NotNil(sender.client) - s.Nil(sender.clientExt) + s.Nil(sender.getClientExt()) } From 5d173365771dadcb4bb849962adf2d9aab03a131 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 25 Jan 2024 20:34:14 +0800 Subject: [PATCH 7/7] polish Signed-off-by: Ping Yu --- internal/client/client_test.go | 2 +- internal/locate/region_request.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index c44d725de6..143ccf9b36 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -752,7 +752,7 @@ func TestErrConn(t *testing.T) { assert.EqualError(t, errConn.Err, "conn error") assert.True(t, errors.As(err3, &errConn)) - assert.EqualError(t, e3, errConn.Err.Error()) + assert.EqualError(t, e3, "conn error 3") assert.False(t, errors.As(err4, &errConn)) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 428100363c..727ec0898f 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -215,7 +215,8 @@ func (s *RegionRequestSender) GetClient() client.Client { return s.client } -// getClientExt returns the RPC client with extension functions. +// getClientExt returns the client with ClientExt interface. +// Return nil if the client does not implement ClientExt. // Don't use in critical path. func (s *RegionRequestSender) getClientExt() client.ClientExt { ext, _ := s.client.(client.ClientExt)