diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 53598cc5ec37..075ea6a53ec2 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -307,9 +307,10 @@ func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, erro r := region.GetRegionEpoch() o := origin.GetRegionEpoch() // Region meta is stale, return an error. - if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { + if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() || region.GetTerm() < origin.GetTerm() { return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) } + return origin, nil } diff --git a/server/core/region.go b/server/core/region.go index 45c979a905f0..3e7875707170 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -31,6 +31,7 @@ import ( // RegionInfo records detail region info. // Read-Only once created. type RegionInfo struct { + term uint64 meta *metapb.Region learners []*metapb.Peer voters []*metapb.Peer @@ -90,6 +91,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo { } region := &RegionInfo{ + term: heartbeat.GetTerm(), meta: heartbeat.GetRegion(), leader: heartbeat.GetLeader(), downPeers: heartbeat.GetDownPeers(), @@ -141,6 +143,11 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo { return region } +// GetTerm returns the current term of the region +func (r *RegionInfo) GetTerm() uint64 { + return r.term +} + // GetLearners returns the learners. func (r *RegionInfo) GetLearners() []*metapb.Peer { return r.learners diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 3277dae442bb..840161443526 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1022,3 +1022,72 @@ func (s *clusterTestSuite) TestUpgradeStoreLimit(c *C) { c.Assert(oc.AddOperator(op), IsFalse) c.Assert(oc.RemoveOperator(op), IsFalse) } + +func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) { + tc, err := tests.NewTestCluster(s.ctx, 1) + defer tc.Destroy() + c.Assert(err, IsNil) + + err = tc.RunInitialServers() + c.Assert(err, IsNil) + + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(c, clusterID, grpcPDClient, "127.0.0.1:0") + storeAddrs := []string{"127.0.1.1:0", "127.0.1.1:1", "127.0.1.1:2"} + rc := leaderServer.GetRaftCluster() + c.Assert(rc, NotNil) + rc.SetStorage(core.NewStorage(kv.NewMemoryKV())) + var peers []*metapb.Peer + id := leaderServer.GetAllocator() + for _, addr := range storeAddrs { + storeID, err := id.Alloc() + c.Assert(err, IsNil) + peerID, err := id.Alloc() + c.Assert(err, IsNil) + store := newMetaStore(storeID, addr, "2.1.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", storeID)) + _, err = putStore(c, grpcPDClient, clusterID, store) + c.Assert(err, IsNil) + peers = append(peers, &metapb.Peer{ + Id: peerID, + StoreId: storeID, + }) + } + + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: testutil.NewRequestHeader(clusterID), + Region: &metapb.Region{ + Id: 1, + Peers: peers, + StartKey: []byte{byte(2)}, + EndKey: []byte{byte(3)}, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + }, + Leader: peers[0], + Term: 0, + ApproximateSize: 10, + } + + region := core.RegionFromHeartbeat(regionReq) + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + + // Transfer leader + regionReq.Term = 1 + regionReq.Leader = peers[1] + region = core.RegionFromHeartbeat(regionReq) + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + + // Stale heartbeat, update check should fail + regionReq.Term = 0 + regionReq.Leader = peers[0] + region = core.RegionFromHeartbeat(regionReq) + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, NotNil) +}