Skip to content

Commit

Permalink
cherry pick #2675 to release-4.0 (#2678)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>

Co-authored-by: Haizhi Geng <[email protected]>
  • Loading branch information
ti-srebot and JmPotato authored Jul 27, 2020
1 parent 096ab27 commit 2f2447a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 3 deletions.
20 changes: 20 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

// leader lease
lease *member.LeaderLease
leaseMu sync.RWMutex

member *member.Member
// etcd client
client *clientv3.Client
Expand Down Expand Up @@ -640,6 +644,20 @@ func (s *Server) GetMember() *member.Member {
return s.member
}

// GetLease returns the lease of member and only leader server's lease is not nil.
func (s *Server) GetLease() *member.LeaderLease {
s.leaseMu.RLock()
defer s.leaseMu.RUnlock()
return s.lease
}

// SetLease changes the lease.
func (s *Server) SetLease(lease *member.LeaderLease) {
s.leaseMu.Lock()
defer s.leaseMu.Unlock()
s.lease = lease
}

// GetStorage returns the backend storage of server.
func (s *Server) GetStorage() *core.Storage {
return s.storage
Expand Down Expand Up @@ -1101,6 +1119,8 @@ func (s *Server) campaignLeader() {
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
go lease.KeepAlive(ctx)
s.SetLease(lease)
defer s.SetLease(nil)
log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name()))

log.Debug("sync timestamp for tso")
Expand Down
18 changes: 15 additions & 3 deletions server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (t *TimestampOracle) checkLease() bool {
return t.lease != nil && !t.lease.IsExpired()
}

func (t *TimestampOracle) setLease(lease *member.LeaderLease) {
t.mu.Lock()
defer t.mu.Unlock()
t.lease = lease
}

// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it,
// otherwise, update it.
func (t *TimestampOracle) saveTimestamp(ts time.Time) error {
Expand All @@ -119,6 +125,12 @@ func (t *TimestampOracle) saveTimestamp(ts time.Time) error {
func (t *TimestampOracle) SyncTimestamp(lease *member.LeaderLease) error {
tsoCounter.WithLabelValues("sync").Inc()

t.setLease(lease)

failpoint.Inject("delaySyncTimestamp", func() {
time.Sleep(time.Second)
})

last, err := t.loadTimestamp()
if err != nil {
return err
Expand Down Expand Up @@ -148,9 +160,6 @@ func (t *TimestampOracle) SyncTimestamp(lease *member.LeaderLease) error {
current := &atomicObject{
physical: next,
}
t.mu.Lock()
t.lease = lease
t.mu.Unlock()
atomic.StorePointer(&t.ts, unsafe.Pointer(current))

return nil
Expand Down Expand Up @@ -264,6 +273,7 @@ func (t *TimestampOracle) ResetTimestamp() {
physical: typeutil.ZeroTime,
}
atomic.StorePointer(&t.ts, unsafe.Pointer(zero))
t.setLease(nil)
}

var maxRetryCount = 10
Expand All @@ -285,9 +295,11 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) {
if current == nil || current.physical == typeutil.ZeroTime {
// If it's leader, maybe SyncTimestamp hasn't completed yet
if t.checkLease() {
log.Info("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
log.Error("invalid timestamp", zap.Any("timestamp", current))
return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader")
}

Expand Down
14 changes: 14 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/id"
"github.com/pingcap/pd/v4/server/join"
"github.com/pingcap/pd/v4/server/member"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -328,6 +329,19 @@ func (s *TestServer) BootstrapCluster() error {
return nil
}

// WaitLease is used to get leader lease.
// If it exceeds the maximum number of loops, it will return nil.
func (s *TestServer) WaitLease() *member.LeaderLease {
for i := 0; i < 100; i++ {
lease := s.server.GetLease()
if lease != nil {
return lease
}
time.Sleep(WaitLeaderCheckInterval)
}
return nil
}

// TestCluster is only for test.
type TestCluster struct {
config *clusterConfig
Expand Down
47 changes: 47 additions & 0 deletions tests/server/tso/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,53 @@ func (s *testTsoSuite) TestRequestFollower(c *C) {
c.Assert(time.Since(start), Less, time.Second)
}

// In some cases, when a TSO request arrives, the SyncTimestamp may not finish yet.
// This test is used to simulate this situation and verify that the retry mechanism.
func (s *testTsoSuite) TestDeplaySyncTimestamp(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 2)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

var leaderServer, nextLeaderServer *tests.TestServer
leaderServer = cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer, NotNil)
for _, s := range cluster.GetServers() {
if s.GetConfig().Name != cluster.GetLeader() {
nextLeaderServer = s
}
}
c.Assert(nextLeaderServer, NotNil)

grpcPDClient := testutil.MustNewGrpcClient(c, nextLeaderServer.GetAddr())
clusterID := nextLeaderServer.GetClusterID()
req := &pdpb.TsoRequest{
Header: testutil.NewRequestHeader(clusterID),
Count: 1,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c.Assert(failpoint.Enable("github.com/pingcap/pd/v4/server/tso/delaySyncTimestamp", `return(true)`), IsNil)

// Make the old leader resign and wait for the new leader to get a lease
leaderServer.ResignLeader()
c.Assert(nextLeaderServer.WaitLease(), NotNil)

tsoClient, err := grpcPDClient.Tso(ctx)
c.Assert(err, IsNil)
defer tsoClient.CloseSend()
err = tsoClient.Send(req)
c.Assert(err, IsNil)
resp, err := tsoClient.Recv()
c.Assert(err, IsNil)
c.Assert(resp.GetCount(), Equals, uint32(1))
failpoint.Disable("github.com/pingcap/pd/v4/server/tso/delaySyncTimestamp")
}

var _ = Suite(&testTimeFallBackSuite{})

type testTimeFallBackSuite struct {
Expand Down

0 comments on commit 2f2447a

Please sign in to comment.