From 188d0d88c23857dd9d6469c09757f5531ef1380f Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Mon, 12 Jun 2023 23:45:54 -0700 Subject: [PATCH] ms, tso: pick some enhancements (#113) * Add keyspace and keyspace group info to the time fallback log. (#6581) ref tikv/pd#5895 Add keyspace and keyspace group info to the time fallback log to help debugging time fallback issue in multi-timeline scenario. Signed-off-by: Bin Shi * config, server: fault injection in TSO Proxy (#6588) ref tikv/pd#5895 Add failure test cases. Signed-off-by: Bin Shi * mcs, tso: fix stream Send() and CloseSend() data race issue in TSO proxy (#6591) close tikv/pd#6590 No need to call SendClose(), because TSO proxy will cancel the stream context which will cause the corresponding grpc stream on the server side to exit. Signed-off-by: Bin Shi Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * tests: make TestSplitKeyspaceGroup stable (#6584) close tikv/pd#6571 Signed-off-by: lhy1024 --------- Signed-off-by: Bin Shi Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: lhy1024 --- client/tso_dispatcher.go | 6 +- pkg/keyspace/keyspace_test.go | 2 +- pkg/keyspace/tso_keyspace_group.go | 55 +++++++++------ pkg/keyspace/tso_keyspace_group_test.go | 2 +- server/cluster/cluster.go | 2 +- server/config/config.go | 14 ++-- server/grpc_service.go | 40 ++++++----- server/server.go | 6 +- tests/integrations/mcs/tso/proxy_test.go | 74 +++++++++++++++++++++ tests/pdctl/keyspace/keyspace_group_test.go | 50 +++++++++++++- tests/pdctl/scheduler/scheduler_test.go | 6 +- 11 files changed, 203 insertions(+), 54 deletions(-) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index d2d62814619..30e7e670e09 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -739,8 +739,10 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then // all TSOs we get will be [6, 7, 8, 9, 10]. if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) { - panic(errors.Errorf("%s timestamp fallback, newly acquired ts (%d, %d) is less or equal to last one (%d, %d)", - dcLocation, physical, firstLogical, lastPhysical, lastLogical)) + panic(errors.Errorf( + "%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d", + dcLocation, physical, firstLogical, lastPhysical, lastLogical, + c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID())) } lastTSOPointer.physical = physical // Same as above, we save the largest logical part here. diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index 948fe434088..dadc2a2509f 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -80,7 +80,7 @@ func (suite *keyspaceTestSuite) SetupTest() { allocator := mockid.NewIDAllocator() kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0) suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm) - suite.NoError(kgm.Bootstrap()) + suite.NoError(kgm.Bootstrap(suite.ctx)) suite.NoError(suite.manager.Bootstrap()) } diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 2a477c3dea1..efe905b6439 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "strconv" + "strings" "sync" "time" @@ -51,9 +52,11 @@ const ( // GroupManager is the manager of keyspace group related data. type GroupManager struct { - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + client *clientv3.Client + clusterID uint64 sync.RWMutex // groups is the cache of keyspace group related information. @@ -90,24 +93,24 @@ func NewKeyspaceGroupManager( cancel: cancel, store: store, groups: groups, + client: client, + clusterID: clusterID, nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy), serviceRegistryMap: make(map[string]string), } // If the etcd client is not nil, start the watch loop for the registered tso servers. // The PD(TSO) Client relies on this info to discover tso servers. - if client != nil { - m.initTSONodesWatcher(client, clusterID) - m.wg.Add(2) + if m.client != nil { + m.initTSONodesWatcher(m.client, m.clusterID) + m.wg.Add(1) go m.tsoNodesWatcher.StartWatchLoop() - go m.allocNodesToAllKeyspaceGroups() } - return m } // Bootstrap saves default keyspace group info and init group mapping in the memory. -func (m *GroupManager) Bootstrap() error { +func (m *GroupManager) Bootstrap(ctx context.Context) error { // Force the membership restriction that the default keyspace must belong to default keyspace group. // Have no information to specify the distribution of the default keyspace group replicas, so just // leave the replica/member list empty. The TSO service will assign the default keyspace group replica @@ -137,6 +140,11 @@ func (m *GroupManager) Bootstrap() error { m.groups[userKind].Put(group) } + // It will only alloc node when the group manager is on API leader. + if m.client != nil { + m.wg.Add(1) + go m.allocNodesToAllKeyspaceGroups(ctx) + } return nil } @@ -146,7 +154,7 @@ func (m *GroupManager) Close() { m.wg.Wait() } -func (m *GroupManager) allocNodesToAllKeyspaceGroups() { +func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { defer logutil.LogPanic() defer m.wg.Done() ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval) @@ -158,7 +166,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { log.Info("start to alloc nodes to all keyspace groups") for { select { - case <-m.ctx.Done(): + case <-ctx.Done(): log.Info("stop to alloc nodes to all keyspace groups") return case <-ticker.C: @@ -338,11 +346,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro Members: keyspaceGroup.Members, Keyspaces: keyspaceGroup.Keyspaces, } - if oldKG.IsSplitting() { - newKG.SplitState = &endpoint.SplitState{ - SplitSource: oldKG.SplitState.SplitSource, - } - } err = m.store.SaveKeyspaceGroup(txn, newKG) if err != nil { return err @@ -380,6 +383,8 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind) return config, nil } +var failpointOnce sync.Once + // UpdateKeyspaceForGroup updates the keyspace field for the keyspace group. func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error { // when server is not in API mode, we don't need to update the keyspace for keyspace group @@ -391,6 +396,12 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI return err } + failpoint.Inject("externalAllocNode", func(val failpoint.Value) { + failpointOnce.Do(func() { + addrs := val.(string) + m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, strings.Split(addrs, ",")) + }) + }) m.Lock() defer m.Unlock() return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation) @@ -425,7 +436,6 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil { return err } - m.groups[userKind].Put(kg) } return nil @@ -696,8 +706,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error { m.Lock() defer m.Unlock() - return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { - kg, err := m.store.LoadKeyspaceGroup(txn, id) + var kg *endpoint.KeyspaceGroup + err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { + var err error + kg, err = m.store.LoadKeyspaceGroup(txn, id) if err != nil { return err } @@ -714,6 +726,11 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error kg.Members = members return m.store.SaveKeyspaceGroup(txn, kg) }) + if err != nil { + return err + } + m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg) + return nil } // IsExistNode checks if the node exists. diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index e7a7fe246f7..6422536a8d3 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -48,7 +48,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { idAllocator := mockid.NewIDAllocator() cluster := mockcluster.NewCluster(suite.ctx, config.NewTestOptions()) suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm) - suite.NoError(suite.kgm.Bootstrap()) + suite.NoError(suite.kgm.Bootstrap(suite.ctx)) } func (suite *keyspaceGroupTestSuite) TearDownTest() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 6017d965df8..395251235e4 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -270,7 +270,7 @@ func (c *RaftCluster) Start(s Server) error { } if s.IsAPIServiceMode() { - err = c.keyspaceGroupManager.Bootstrap() + err = c.keyspaceGroupManager.Bootstrap(c.ctx) if err != nil { return err } diff --git a/server/config/config.go b/server/config/config.go index 8eb999250e4..0008f04bbc3 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -85,9 +85,9 @@ type Config struct { // Set this to 0 will disable TSO Proxy. // Set this to the negative value to disable the limit. MaxConcurrentTSOProxyStreamings int `toml:"max-concurrent-tso-proxy-streamings" json:"max-concurrent-tso-proxy-streamings"` - // TSOProxyClientRecvTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream. + // TSOProxyRecvFromClientTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream. // After the timeout, the TSO proxy will close the grpc TSO stream. - TSOProxyClientRecvTimeout typeutil.Duration `toml:"tso-proxy-client-recv-timeout" json:"tso-proxy-client-recv-timeout"` + TSOProxyRecvFromClientTimeout typeutil.Duration `toml:"tso-proxy-recv-from-client-timeout" json:"tso-proxy-recv-from-client-timeout"` // TSOSaveInterval is the interval to save timestamp. TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"` @@ -228,7 +228,7 @@ const ( defaultDRWaitStoreTimeout = time.Minute defaultMaxConcurrentTSOProxyStreamings = 5000 - defaultTSOProxyClientRecvTimeout = 1 * time.Hour + defaultTSOProxyRecvFromClientTimeout = 1 * time.Hour defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second // defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`. @@ -458,7 +458,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { } configutil.AdjustInt(&c.MaxConcurrentTSOProxyStreamings, defaultMaxConcurrentTSOProxyStreamings) - configutil.AdjustDuration(&c.TSOProxyClientRecvTimeout, defaultTSOProxyClientRecvTimeout) + configutil.AdjustDuration(&c.TSOProxyRecvFromClientTimeout, defaultTSOProxyRecvFromClientTimeout) configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease) configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval) @@ -1266,9 +1266,9 @@ func (c *Config) GetMaxConcurrentTSOProxyStreamings() int { return c.MaxConcurrentTSOProxyStreamings } -// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout. -func (c *Config) GetTSOProxyClientRecvTimeout() time.Duration { - return c.TSOProxyClientRecvTimeout.Duration +// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client. +func (c *Config) GetTSOProxyRecvFromClientTimeout() time.Duration { + return c.TSOProxyRecvFromClientTimeout.Duration } // GetTSOUpdatePhysicalInterval returns TSO update physical interval. diff --git a/server/grpc_service.go b/server/grpc_service.go index 6027acb1736..1f14ebed52e 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -69,7 +69,7 @@ var ( ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") - ErrTSOProxyClientRecvTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy client recv timeout. stream closed by server") + ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") ) // GrpcServer wraps Server to provide grpc service. @@ -409,9 +409,6 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { ) defer func() { s.concurrentTSOProxyStreamings.Add(-1) - if forwardStream != nil { - forwardStream.CloseSend() - } // cancel the forward stream if cancel != nil { cancel() @@ -433,7 +430,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { default: } - request, err := server.Recv(s.GetTSOProxyClientRecvTimeout()) + request, err := server.Recv(s.GetTSOProxyRecvFromClientTimeout()) if err == io.EOF { return nil } @@ -450,9 +447,6 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { return errors.WithStack(ErrNotFoundTSOAddr) } if forwardStream == nil || lastForwardedHost != forwardedHost { - if forwardStream != nil { - forwardStream.CloseSend() - } if cancel != nil { cancel() } @@ -548,6 +542,11 @@ func (s *GrpcServer) forwardTSORequestAsync( DcLocation: request.GetDcLocation(), } + failpoint.Inject("tsoProxySendToTSOTimeout", func() { + <-ctxTimeout.Done() + failpoint.Return() + }) + if err := forwardStream.Send(tsopbReq); err != nil { select { case <-ctxTimeout.Done(): @@ -563,23 +562,21 @@ func (s *GrpcServer) forwardTSORequestAsync( default: } + failpoint.Inject("tsoProxyRecvFromTSOTimeout", func() { + <-ctxTimeout.Done() + failpoint.Return() + }) + response, err := forwardStream.Recv() if err != nil { if strings.Contains(err.Error(), errs.NotLeaderErr) { s.tsoPrimaryWatcher.ForceLoad() } - select { - case <-ctxTimeout.Done(): - return - case tsoRespCh <- &tsopbTSOResponse{err: err}: - } - return } - select { case <-ctxTimeout.Done(): return - case tsoRespCh <- &tsopbTSOResponse{response: response}: + case tsoRespCh <- &tsopbTSOResponse{response: response, err: err}: } } @@ -607,6 +604,10 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error { done := make(chan error, 1) go func() { defer logutil.LogPanic() + failpoint.Inject("tsoProxyFailToSendToClient", func() { + done <- errors.New("injected error") + failpoint.Return() + }) done <- s.stream.Send(m) }() select { @@ -625,6 +626,11 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { if atomic.LoadInt32(&s.closed) == 1 { return nil, io.EOF } + failpoint.Inject("tsoProxyRecvFromClientTimeout", func(val failpoint.Value) { + if customTimeoutInSeconds, ok := val.(int); ok { + timeout = time.Duration(customTimeoutInSeconds) * time.Second + } + }) requestCh := make(chan *pdpbTSORequest, 1) go func() { defer logutil.LogPanic() @@ -640,7 +646,7 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { return req.request, nil case <-time.After(timeout): atomic.StoreInt32(&s.closed, 1) - return nil, ErrTSOProxyClientRecvTimeout + return nil, ErrTSOProxyRecvFromClientTimeout } } diff --git a/server/server.go b/server/server.go index 5085ecb4329..55ef327af7f 100644 --- a/server/server.go +++ b/server/server.go @@ -1844,9 +1844,9 @@ func (s *Server) GetMaxConcurrentTSOProxyStreamings() int { return s.cfg.GetMaxConcurrentTSOProxyStreamings() } -// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout. -func (s *Server) GetTSOProxyClientRecvTimeout() time.Duration { - return s.cfg.GetTSOProxyClientRecvTimeout() +// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client. +func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration { + return s.cfg.GetTSOProxyRecvFromClientTimeout() } // GetLeaderLease returns the leader lease. diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index d4d73a74b83..d873665c258 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/stretchr/testify/require" @@ -194,6 +195,79 @@ func (s *tsoProxyTestSuite) TestTSOProxyClientsWithSameContext() { s.cleanupGRPCStreams(cleanupFuncs) } +// TestTSOProxyRecvFromClientTimeout tests the TSO Proxy can properly close the grpc stream on the server side +// when the client does not send any request to the server for a long time. +func (s *tsoProxyTestSuite) TestTSOProxyRecvFromClientTimeout() { + re := s.Require() + + // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout", `return(1)`)) + streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + // Sleep 2 seconds to make the TSO Proxy's grpc stream timeout on the server side. + time.Sleep(2 * time.Second) + err := streams[0].Send(s.defaultReq) + re.Error(err) + s.cleanupGRPCStreams(cleanupFuncs) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout")) + + // Verify the streams with no fault injection can work correctly. + s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true) +} + +// TestTSOProxyFailToSendToClient tests the TSO Proxy can properly close the grpc stream on the server side +// when it fails to send the response to the client. +func (s *tsoProxyTestSuite) TestTSOProxyFailToSendToClient() { + re := s.Require() + + // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyFailToSendToClient", `return(true)`)) + streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + err := streams[0].Send(s.defaultReq) + re.NoError(err) + _, err = streams[0].Recv() + re.Error(err) + s.cleanupGRPCStreams(cleanupFuncs) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyFailToSendToClient")) + + s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true) +} + +// TestTSOProxySendToTSOTimeout tests the TSO Proxy can properly close the grpc stream on the server side +// when it sends the request to the TSO service and encounters timeout. +func (s *tsoProxyTestSuite) TestTSOProxySendToTSOTimeout() { + re := s.Require() + + // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout", `return(true)`)) + streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + err := streams[0].Send(s.defaultReq) + re.NoError(err) + _, err = streams[0].Recv() + re.Error(err) + s.cleanupGRPCStreams(cleanupFuncs) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout")) + + s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true) +} + +// TestTSOProxyRecvFromTSOTimeout tests the TSO Proxy can properly close the grpc stream on the server side +// when it receives the response from the TSO service and encounters timeout. +func (s *tsoProxyTestSuite) TestTSOProxyRecvFromTSOTimeout() { + re := s.Require() + + // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout", `return(true)`)) + streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + err := streams[0].Send(s.defaultReq) + re.NoError(err) + _, err = streams[0].Recv() + re.Error(err) + s.cleanupGRPCStreams(cleanupFuncs) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout")) + + s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true) +} + func (s *tsoProxyTestSuite) cleanupGRPCStreams(cleanupFuncs []testutil.CleanupFunc) { for i := 0; i < len(cleanupFuncs); i++ { if cleanupFuncs[i] != nil { diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 28cf968e04c..133edb78579 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -96,7 +96,8 @@ func TestSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) keyspaces := make([]string, 0) - for i := 0; i < 500; i++ { + // we test the case which exceed the default max txn ops limit in etcd, which is 128. + for i := 0; i < 129; i++ { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { @@ -126,8 +127,53 @@ func TestSplitKeyspaceGroup(t *testing.T) { output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) return strings.Contains(string(output), "Success") - }, testutil.WithWaitFor(20*time.Second)) + }) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) +} + +func TestExternalAllocNodeWhenStart(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // external alloc node for keyspace group, when keyspace manager update keyspace info to keyspace group + // we hope the keyspace group can be updated correctly. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/externalAllocNode", `return("127.0.0.1:2379,127.0.0.1:2380")`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + keyspaces := make([]string, 0) + for i := 0; i < 10; i++ { + keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) + } + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = keyspaces + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + cmd := pdctlCmd.GetRootCmd() + + time.Sleep(2 * time.Second) + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + // check keyspace group information. + defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID) + args := []string{"-u", pdAddr, "keyspace-group"} + testutil.Eventually(re, func() bool { + output, err := pdctl.ExecuteCommand(cmd, append(args, defaultKeyspaceGroupID)...) + re.NoError(err) + var keyspaceGroup endpoint.KeyspaceGroup + err = json.Unmarshal(output, &keyspaceGroup) + re.NoError(err) + return len(keyspaceGroup.Keyspaces) == len(keyspaces)+1 && len(keyspaceGroup.Members) == 2 + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/externalAllocNode")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 12813c8b097..d1defc4ba61 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -460,7 +460,11 @@ func TestScheduler(t *testing.T) { checkSchedulerWithStatusCommand(nil, "paused", []string{ "balance-leader-scheduler", }) - checkSchedulerDescribeCommand("balance-leader-scheduler", "paused", "") + result := make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mightExec([]string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) + return len(result) != 0 && result["status"] == "paused" && result["summary"] == "" + }, testutil.WithWaitFor(30*time.Second)) mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) mustExec([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil)