Skip to content

Commit

Permalink
Merge branch 'release-6.5' into disksing/dr-replicate-2
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Aug 2, 2023
2 parents 1255a7d + 2892b46 commit dc56b82
Show file tree
Hide file tree
Showing 20 changed files with 339 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES)
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config revive.toml $(PACKAGES)

Expand Down
5 changes: 4 additions & 1 deletion cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func main() {

// Creates server.
ctx, cancel := context.WithCancel(context.Background())
serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, swaggerserver.NewHandler, autoscaling.NewHandler}
serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, autoscaling.NewHandler}
if swaggerserver.Enabled() {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, serviceBuilders...)
if err != nil {
Expand Down
29 changes: 0 additions & 29 deletions pkg/swaggerserver/swagger_handler.go

This file was deleted.

12 changes: 11 additions & 1 deletion pkg/swaggerserver/swaggerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build swagger_server
// +build swagger_server

package swaggerserver

import (
"context"
"net/http"

httpSwagger "github.com/swaggo/http-swagger"
_ "github.com/tikv/pd/docs/swagger"
"github.com/tikv/pd/server"
)

Expand All @@ -32,9 +37,14 @@ var (
}
)

// Enabled return true if swagger server is disabled.
func Enabled() bool {
return true
}

// NewHandler creates a HTTP handler for Swagger.
func NewHandler(context.Context, *server.Server) (http.Handler, server.ServiceGroup, error) {
swaggerHandler := http.NewServeMux()
swaggerHandler.Handle(swaggerPrefix, handler())
swaggerHandler.Handle(swaggerPrefix, httpSwagger.Handler())
return swaggerHandler, swaggerServiceGroup, nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 TiKV Project Authors.
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,12 +18,18 @@
package swaggerserver

import (
"io"
"context"
"net/http"

"github.com/tikv/pd/server"
)

func handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.WriteString(w, "Swagger UI is not built. Try `make` without `SWAGGER=1`.\n")
})
// Enabled return false if swagger server is disabled.
func Enabled() bool {
return false
}

// NewHandler creates a HTTP handler for Swagger.
func NewHandler(context.Context, *server.Server) (http.Handler, server.ServiceGroup, error) {
return nil, server.ServiceGroup{}, nil
}
28 changes: 17 additions & 11 deletions server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,13 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul
regionURL := fmt.Sprintf("%s/operators/%d", suite.urlPrefix, region.GetId())
operator := mustReadURL(re, regionURL)
suite.Contains(operator, "operator not found")

convertStepsToStr := func(steps []string) string {
stepStrs := make([]string, len(steps))
for i := range steps {
stepStrs[i] = fmt.Sprintf("%d:{%s}", i, steps[i])
}
return strings.Join(stepStrs, ", ")
}
testCases := []struct {
name string
placementRuleEnable bool
Expand All @@ -231,25 +237,25 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul
placementRuleEnable: false,
input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3]}`),
expectedError: nil,
expectSteps: strings.Join([]string{
expectSteps: convertStepsToStr([]string{
pdoperator.AddLearner{ToStore: 3, PeerID: 1}.String(),
pdoperator.PromoteLearner{ToStore: 3, PeerID: 1}.String(),
pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(),
pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(),
}, ", "),
}),
},
{
name: "placement rule disable with peer role",
placementRuleEnable: false,
input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`),
expectedError: nil,
expectSteps: strings.Join([]string{
expectSteps: convertStepsToStr([]string{
pdoperator.AddLearner{ToStore: 3, PeerID: 2}.String(),
pdoperator.PromoteLearner{ToStore: 3, PeerID: 2}.String(),
pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(),
pdoperator.RemovePeer{FromStore: 1, PeerID: 2}.String(),
pdoperator.TransferLeader{FromStore: 2, ToStore: 3}.String(),
}, ", "),
}),
},
{
name: "default placement rule without peer role",
Expand All @@ -262,13 +268,13 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul
name: "default placement rule with peer role",
placementRuleEnable: true,
input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`),
expectSteps: strings.Join([]string{
expectSteps: convertStepsToStr([]string{
pdoperator.AddLearner{ToStore: 3, PeerID: 3}.String(),
pdoperator.PromoteLearner{ToStore: 3, PeerID: 3}.String(),
pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(),
pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(),
pdoperator.TransferLeader{FromStore: 2, ToStore: 3}.String(),
}, ", "),
}),
},
{
name: "default placement rule with invalid input",
Expand Down Expand Up @@ -323,12 +329,12 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul
},
input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`),
expectedError: nil,
expectSteps: strings.Join([]string{
expectSteps: convertStepsToStr([]string{
pdoperator.AddLearner{ToStore: 3, PeerID: 5}.String(),
pdoperator.PromoteLearner{ToStore: 3, PeerID: 5}.String(),
pdoperator.TransferLeader{FromStore: 1, ToStore: 3}.String(),
pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(),
}, ", "),
}),
},
{
name: "customized placement rule with valid peer role2",
Expand Down Expand Up @@ -363,12 +369,12 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul
},
input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["leader", "follower"]}`),
expectedError: nil,
expectSteps: strings.Join([]string{
expectSteps: convertStepsToStr([]string{
pdoperator.AddLearner{ToStore: 3, PeerID: 6}.String(),
pdoperator.PromoteLearner{ToStore: 3, PeerID: 6}.String(),
pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(),
pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(),
}, ", "),
}),
},
}
for _, testCase := range testCases {
Expand Down
27 changes: 22 additions & 5 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,28 @@ func (c *coordinator) getHotRegionsByType(typ statistics.RWType) *statistics.Sto
default:
}
// update params `IsLearner` and `LastUpdateTime`
for _, stores := range []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer} {
for _, store := range stores {
for _, hotPeer := range store.Stats {
region := c.cluster.GetRegion(hotPeer.RegionID)
hotPeer.UpdateHotPeerStatShow(region)
s := []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer}
for i, stores := range s {
for j, store := range stores {
for k := range store.Stats {
h := &s[i][j].Stats[k]
region := c.cluster.GetRegion(h.RegionID)
if region != nil {
h.IsLearner = core.IsLearner(region.GetPeer(h.StoreID))
}
switch typ {
case statistics.Write:
if region != nil {
h.LastUpdateTime = time.Unix(int64(region.GetInterval().GetEndTimestamp()), 0)
}
case statistics.Read:
store := c.cluster.GetStore(h.StoreID)
if store != nil {
ts := store.GetMeta().GetLastHeartbeat()
h.LastUpdateTime = time.Unix(ts/1e9, ts%1e9)
}
default:
}
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,6 @@ func (u *unsafeRecoveryController) getFailedPeers(region *metapb.Region) []*meta

var failedPeers []*metapb.Peer
for _, peer := range region.Peers {
if peer.Role == metapb.PeerRole_Learner || peer.Role == metapb.PeerRole_DemotingVoter {
continue
}
if u.isFailed(peer) {
failedPeers = append(failedPeers, peer)
}
Expand Down
42 changes: 42 additions & 0 deletions server/cluster/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,48 @@ func TestAutoDetectMode(t *testing.T) {
}
}

// Failed learner replica/ store should be considered by auto-recover.
func TestAutoDetectModeWithOneLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
for _, store := range newTestStores(1, "6.0.0") {
re.NoError(cluster.PutStore(store.GetMeta()))
}
recoveryController := newUnsafeRecoveryController(cluster)
re.NoError(recoveryController.RemoveFailedStores(nil, 60, true))

storeReport := pdpb.StoreReport{
PeerReports: []*pdpb.PeerReport{
{
RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}},
RegionState: &raft_serverpb.RegionLocalState{
Region: &metapb.Region{
Id: 1001,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 7, Version: 10},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1}, {Id: 12, StoreId: 2}, {Id: 13, StoreId: 3, Role: metapb.PeerRole_Learner}}}}},
},
}
req := newStoreHeartbeat(1, &storeReport)
req.StoreReport.Step = 1
resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(req, resp)
hasStore3AsFailedStore := false
for _, failedStore := range resp.RecoveryPlan.ForceLeader.FailedStores {
if failedStore == 3 {
hasStore3AsFailedStore = true
break
}
}
re.True(hasStore3AsFailedStore)
}

func TestOneLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
10 changes: 8 additions & 2 deletions server/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package election

import (
"context"
"sync"
"sync/atomic"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -54,8 +55,9 @@ type Leadership struct {
leaderKey string
leaderValue string

keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock sync.Mutex
}

// NewLeadership creates a new Leadership.
Expand Down Expand Up @@ -137,7 +139,9 @@ func (ls *Leadership) Keep(ctx context.Context) {
if ls == nil {
return
}
ls.keepAliveCancelFuncLock.Lock()
ls.keepAliveCtx, ls.keepAliveCancelFunc = context.WithCancel(ctx)
ls.keepAliveCancelFuncLock.Unlock()
go ls.getLease().KeepAlive(ls.keepAliveCtx)
}

Expand Down Expand Up @@ -230,8 +234,10 @@ func (ls *Leadership) Reset() {
if ls == nil || ls.getLease() == nil {
return
}
ls.keepAliveCancelFuncLock.Lock()
if ls.keepAliveCancelFunc != nil {
ls.keepAliveCancelFunc()
}
ls.keepAliveCancelFuncLock.Unlock()
ls.getLease().Close()
}
Loading

0 comments on commit dc56b82

Please sign in to comment.