From 2f2447a0d3b3cfe16133ce1c86752bec3ffff9d8 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 27 Jul 2020 23:59:14 +0800 Subject: [PATCH] cherry pick #2675 to release-4.0 (#2678) Signed-off-by: ti-srebot Co-authored-by: Haizhi Geng --- server/server.go | 20 +++++++++++++++ server/tso/tso.go | 18 +++++++++++--- tests/cluster.go | 14 +++++++++++ tests/server/tso/tso_test.go | 47 ++++++++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 3 deletions(-) diff --git a/server/server.go b/server/server.go index 5ad2d9a9119..b6af4df34e4 100644 --- a/server/server.go +++ b/server/server.go @@ -99,6 +99,10 @@ type Server struct { serverLoopCancel func() serverLoopWg sync.WaitGroup + // leader lease + lease *member.LeaderLease + leaseMu sync.RWMutex + member *member.Member // etcd client client *clientv3.Client @@ -640,6 +644,20 @@ func (s *Server) GetMember() *member.Member { return s.member } +// GetLease returns the lease of member and only leader server's lease is not nil. +func (s *Server) GetLease() *member.LeaderLease { + s.leaseMu.RLock() + defer s.leaseMu.RUnlock() + return s.lease +} + +// SetLease changes the lease. +func (s *Server) SetLease(lease *member.LeaderLease) { + s.leaseMu.Lock() + defer s.leaseMu.Unlock() + s.lease = lease +} + // GetStorage returns the backend storage of server. func (s *Server) GetStorage() *core.Storage { return s.storage @@ -1101,6 +1119,8 @@ func (s *Server) campaignLeader() { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() go lease.KeepAlive(ctx) + s.SetLease(lease) + defer s.SetLease(nil) log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name())) log.Debug("sync timestamp for tso") diff --git a/server/tso/tso.go b/server/tso/tso.go index 3c39e44adee..bf2ca3f565f 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -94,6 +94,12 @@ func (t *TimestampOracle) checkLease() bool { return t.lease != nil && !t.lease.IsExpired() } +func (t *TimestampOracle) setLease(lease *member.LeaderLease) { + t.mu.Lock() + defer t.mu.Unlock() + t.lease = lease +} + // save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, // otherwise, update it. func (t *TimestampOracle) saveTimestamp(ts time.Time) error { @@ -119,6 +125,12 @@ func (t *TimestampOracle) saveTimestamp(ts time.Time) error { func (t *TimestampOracle) SyncTimestamp(lease *member.LeaderLease) error { tsoCounter.WithLabelValues("sync").Inc() + t.setLease(lease) + + failpoint.Inject("delaySyncTimestamp", func() { + time.Sleep(time.Second) + }) + last, err := t.loadTimestamp() if err != nil { return err @@ -148,9 +160,6 @@ func (t *TimestampOracle) SyncTimestamp(lease *member.LeaderLease) error { current := &atomicObject{ physical: next, } - t.mu.Lock() - t.lease = lease - t.mu.Unlock() atomic.StorePointer(&t.ts, unsafe.Pointer(current)) return nil @@ -264,6 +273,7 @@ func (t *TimestampOracle) ResetTimestamp() { physical: typeutil.ZeroTime, } atomic.StorePointer(&t.ts, unsafe.Pointer(zero)) + t.setLease(nil) } var maxRetryCount = 10 @@ -285,9 +295,11 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { if current == nil || current.physical == typeutil.ZeroTime { // If it's leader, maybe SyncTimestamp hasn't completed yet if t.checkLease() { + log.Info("sync hasn't completed yet, wait for a while") time.Sleep(200 * time.Millisecond) continue } + log.Error("invalid timestamp", zap.Any("timestamp", current)) return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader") } diff --git a/tests/cluster.go b/tests/cluster.go index 80e2da00200..e50e1fcb50a 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/id" "github.com/pingcap/pd/v4/server/join" + "github.com/pingcap/pd/v4/server/member" "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -328,6 +329,19 @@ func (s *TestServer) BootstrapCluster() error { return nil } +// WaitLease is used to get leader lease. +// If it exceeds the maximum number of loops, it will return nil. +func (s *TestServer) WaitLease() *member.LeaderLease { + for i := 0; i < 100; i++ { + lease := s.server.GetLease() + if lease != nil { + return lease + } + time.Sleep(WaitLeaderCheckInterval) + } + return nil +} + // TestCluster is only for test. type TestCluster struct { config *clusterConfig diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 4fec5331ffd..93f1d6d361d 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -210,6 +210,53 @@ func (s *testTsoSuite) TestRequestFollower(c *C) { c.Assert(time.Since(start), Less, time.Second) } +// In some cases, when a TSO request arrives, the SyncTimestamp may not finish yet. +// This test is used to simulate this situation and verify that the retry mechanism. +func (s *testTsoSuite) TestDeplaySyncTimestamp(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 2) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + + var leaderServer, nextLeaderServer *tests.TestServer + leaderServer = cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer, NotNil) + for _, s := range cluster.GetServers() { + if s.GetConfig().Name != cluster.GetLeader() { + nextLeaderServer = s + } + } + c.Assert(nextLeaderServer, NotNil) + + grpcPDClient := testutil.MustNewGrpcClient(c, nextLeaderServer.GetAddr()) + clusterID := nextLeaderServer.GetClusterID() + req := &pdpb.TsoRequest{ + Header: testutil.NewRequestHeader(clusterID), + Count: 1, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c.Assert(failpoint.Enable("github.com/pingcap/pd/v4/server/tso/delaySyncTimestamp", `return(true)`), IsNil) + + // Make the old leader resign and wait for the new leader to get a lease + leaderServer.ResignLeader() + c.Assert(nextLeaderServer.WaitLease(), NotNil) + + tsoClient, err := grpcPDClient.Tso(ctx) + c.Assert(err, IsNil) + defer tsoClient.CloseSend() + err = tsoClient.Send(req) + c.Assert(err, IsNil) + resp, err := tsoClient.Recv() + c.Assert(err, IsNil) + c.Assert(resp.GetCount(), Equals, uint32(1)) + failpoint.Disable("github.com/pingcap/pd/v4/server/tso/delaySyncTimestamp") +} + var _ = Suite(&testTimeFallBackSuite{}) type testTimeFallBackSuite struct {