From d6d9feab3e2a5180acbcc7095723d43e97798686 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 21 Feb 2024 13:15:26 +0800 Subject: [PATCH] client: enabling TSO Follower Proxy with TSO service should fail (#7833) ref tikv/pd#5836 Since the TSO service does not support the TSO Follower Proxy, enabling it in this case should fail. Signed-off-by: JmPotato --- client/client.go | 3 +++ client/tso_dispatcher.go | 11 +++++++++- tests/integrations/client/client_test.go | 27 ++++++++++++++++++++++++ tests/integrations/tso/client_test.go | 4 ---- 4 files changed, 40 insertions(+), 5 deletions(-) diff --git a/client/client.go b/client/client.go index ba7ac7fd075..802a377cbf1 100644 --- a/client/client.go +++ b/client/client.go @@ -702,6 +702,9 @@ func (c *client) UpdateOption(option DynamicOption, value any) error { return err } case EnableTSOFollowerProxy: + if c.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE { + return errors.New("[pd] tso follower proxy is only supported in PD service mode") + } enable, ok := value.(bool) if !ok { return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool") diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index cb4c243a56f..c5136d7fd09 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -372,6 +372,9 @@ func (c *tsoClient) handleDispatcher( return case <-c.option.enableTSOFollowerProxyCh: enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + log.Info("[tso] tso follower proxy status changed", + zap.String("dc-location", dc), + zap.Bool("enable", enableTSOFollowerProxy)) if enableTSOFollowerProxy && updateTicker.C == nil { // Because the TSO Follower Proxy is enabled, // the periodic check needs to be performed. @@ -701,7 +704,11 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s } // GC the stale one. connectionCtxs.Range(func(addr, cc any) bool { - if _, ok := tsoStreamBuilders[addr.(string)]; !ok { + addrStr := addr.(string) + if _, ok := tsoStreamBuilders[addrStr]; !ok { + log.Info("[tso] remove the stale tso stream", + zap.String("dc", dc), + zap.String("addr", addrStr)) cc.(*tsoConnectionContext).cancel() connectionCtxs.Delete(addr) } @@ -712,6 +719,8 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s if _, ok = connectionCtxs.Load(addr); ok { continue } + log.Info("[tso] try to create tso stream", + zap.String("dc", dc), zap.String("addr", addr)) cctx, cancel := context.WithCancel(dispatcherCtx) // Do not proxy the leader client. if addr != leaderAddr { diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index cfb896878f2..3e1b0be472e 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -24,6 +24,7 @@ import ( "reflect" "sort" "strconv" + "strings" "sync" "testing" "time" @@ -39,6 +40,7 @@ import ( "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/tso" @@ -49,6 +51,7 @@ import ( "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/integrations/mcs" "go.etcd.io/etcd/clientv3" "go.uber.org/goleak" ) @@ -319,6 +322,30 @@ func TestTSOFollowerProxy(t *testing.T) { wg.Wait() } +func TestTSOFollowerProxyWithTSOService(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestAPICluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + err = cluster.RunInitialServers() + re.NoError(err) + leaderName := cluster.WaitLeader() + pdLeaderServer := cluster.GetServer(leaderName) + re.NoError(pdLeaderServer.BootstrapCluster()) + backendEndpoints := pdLeaderServer.GetAddr() + tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints) + re.NoError(err) + defer tsoCluster.Destroy() + cli := mcs.SetupClientWithKeyspaceID(ctx, re, utils.DefaultKeyspaceID, strings.Split(backendEndpoints, ",")) + re.NotNil(cli) + defer cli.Close() + // TSO service does not support the follower proxy, so enabling it should fail. + err = cli.UpdateOption(pd.EnableTSOFollowerProxy, true) + re.Error(err) +} + // TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207 func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { re := require.New(t) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index c5cc6ec5d6d..3d7b099f342 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -269,10 +269,6 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { // TestGetMinTS tests the correctness of GetMinTS. func (suite *tsoClientTestSuite) TestGetMinTS() { re := suite.Require() - if !suite.legacy { - suite.waitForAllKeyspaceGroupsInServing(re) - } - var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) for i := 0; i < tsoRequestConcurrencyNumber; i++ {