Skip to content

Commit

Permalink
Merge branch 'master' into multi-op
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 14, 2022
2 parents 28204c0 + 34a4cce commit 09fd67f
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 18 deletions.
11 changes: 11 additions & 0 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.")
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
// @Success 200 {string} string "All regions are removed from server cache."
// @Router /admin/cache/regions [delete]
func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
rc.DropCacheAllRegion()
h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.")
}

// FIXME: details of input json body params
// @Tags admin
// @Summary Reset the ts.
Expand Down
68 changes: 68 additions & 0 deletions server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,74 @@ func (suite *adminTestSuite) TestDropRegion() {
suite.Equal(uint64(50), region.GetRegionEpoch().Version)
}

func (suite *adminTestSuite) TestDropRegions() {
cluster := suite.svr.GetRaftCluster()

n := uint64(10000)
np := uint64(3)

regions := make([]*core.RegionInfo, 0, n)
for i := uint64(0); i < n; i++ {
peers := make([]*metapb.Peer, 0, np)
for j := uint64(0); j < np; j++ {
peer := &metapb.Peer{
Id: i*np + j,
}
peer.StoreId = (i + j) % n
peers = append(peers, peer)
}
// initialize region's epoch to (100, 100).
region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i))).Clone(
core.SetPeers(peers),
core.SetRegionConfVer(100),
core.SetRegionVersion(100),
)
regions = append(regions, region)

err := cluster.HandleRegionHeartbeat(region)
suite.NoError(err)
}

// Region epoch cannot decrease.
for i := uint64(0); i < n; i++ {
region := regions[i].Clone(
core.SetRegionConfVer(50),
core.SetRegionVersion(50),
)
regions[i] = region
err := cluster.HandleRegionHeartbeat(region)
suite.Error(err)
}

for i := uint64(0); i < n; i++ {
region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i)))

suite.Equal(uint64(100), region.GetRegionEpoch().ConfVer)
suite.Equal(uint64(100), region.GetRegionEpoch().Version)
}

// After drop all regions from cache, lower version is accepted.
url := fmt.Sprintf("%s/admin/cache/regions", suite.urlPrefix)
req, err := http.NewRequest(http.MethodDelete, url, nil)
suite.NoError(err)
res, err := testDialClient.Do(req)
suite.NoError(err)
suite.Equal(http.StatusOK, res.StatusCode)
res.Body.Close()

for _, region := range regions {
err := cluster.HandleRegionHeartbeat(region)
suite.NoError(err)
}

for i := uint64(0); i < n; i++ {
region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i)))

suite.Equal(uint64(50), region.GetRegionEpoch().ConfVer)
suite.Equal(uint64(50), region.GetRegionEpoch().Version)
}
}

func (suite *adminTestSuite) TestPersistFile() {
data := []byte("#!/bin/sh\nrm -rf /")
re := suite.Require()
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

adminHandler := newAdminHandler(svr, rd)
registerFunc(clusterRouter, "/admin/cache/region/{id}", adminHandler.DeleteRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog))
registerFunc(clusterRouter, "/admin/cache/regions", adminHandler.DeleteAllRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog))
registerFunc(clusterRouter, "/admin/reset-ts", adminHandler.ResetTS, setMethods(http.MethodPost), setAuditBackend(localLog))
registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog))

Expand Down
59 changes: 43 additions & 16 deletions server/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package placement

import (
"math"
"math/bits"
"sort"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -227,34 +228,60 @@ func (w *fitWorker) fitRule(index int) bool {
if len(candidates) < count {
count = len(candidates)
}
return w.enumPeers(candidates, nil, index, count)

return w.fixRuleWithCandidates(candidates, index, count)
}

// Recursively traverses all feasible peer combinations.
// For each combination, call `compareBest` to determine whether it is better
// than the existing option.
// Pick the most suitable peer combination for the rule with candidates.
// Returns true if it replaces `bestFit` with a better alternative.
func (w *fitWorker) enumPeers(candidates, selected []*fitPeer, index int, count int) bool {
if len(selected) == count {
// We collect enough peers. End recursive.
return w.compareBest(selected, index)
}
func (w *fitWorker) fixRuleWithCandidates(candidates []*fitPeer, index int, count int) bool {
// map the candidates to binary numbers with len(candidates) bits,
// each bit can be 1 or 0, 1 means a picked candidate
// the binary numbers with `count` 1 means a choose for the current rule.

var better bool
// make sure the left number of candidates should be enough.
indexLimit := len(candidates) - (count - len(selected))
for i := 0; i <= indexLimit; i++ {
p := candidates[i]
p.selected = true
better = w.enumPeers(candidates[i+1:], append(selected, p), index, count) || better
p.selected = false
limit := uint(1<<len(candidates) - 1)
binaryInt := uint(1<<count - 1)
for ; binaryInt <= limit; binaryInt++ {
// there should be exactly `count` number in current binary number `binaryInt`
if bits.OnesCount(binaryInt) != count {
continue
}
selected := pickPeersFromBinaryInt(candidates, binaryInt)
better = w.compareBest(selected, index) || better
// reset the seleted items to false.
unSelectPeers(selected)
if w.exit {
break
}
}
return better
}

// pickPeersFromBinaryInt picks the candidates with the related index at the position of binary for the `binaryNumber`` is `1`.
// binaryNumber = 5, which means the related binary is 101, it will returns {candidates[0],candidates[2]}
// binaryNumber = 6, which means the related binary is 110, it will returns {candidates[1],candidates[2]}
func pickPeersFromBinaryInt(candidates []*fitPeer, binaryNumber uint) []*fitPeer {
selected := make([]*fitPeer, 0)
for _, p := range candidates {
if binaryNumber&1 == 1 {
p.selected = true
selected = append(selected, p)
}
binaryNumber >>= 1
if binaryNumber == 0 {
break
}
}
return selected
}

func unSelectPeers(seleted []*fitPeer) {
for _, p := range seleted {
p.selected = false
}
}

// compareBest checks if the selected peers is better then previous best.
// Returns true if it replaces `bestFit` with a better alternative.
func (w *fitWorker) compareBest(selected []*fitPeer, index int) bool {
Expand Down
32 changes: 32 additions & 0 deletions server/schedule/placement/fit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,35 @@ func TestIsolationScore(t *testing.T) {
testCase.checker(score1, score2)
}
}

func TestPickPeersFromBinaryInt(t *testing.T) {
re := require.New(t)
var candidates []*fitPeer
for id := uint64(1); id <= 10; id++ {
candidates = append(candidates, &fitPeer{
Peer: &metapb.Peer{Id: id},
})
}
testCases := []struct {
binary string
expectedPeers []uint64
}{
{"0", []uint64{}},
{"1", []uint64{1}},
{"101", []uint64{1, 3}},
{"111", []uint64{1, 2, 3}},
{"1011", []uint64{1, 2, 4}},
{"100011", []uint64{1, 2, 6}},
{"1000001111", []uint64{1, 2, 3, 4, 10}},
}

for _, c := range testCases {
binaryNumber, err := strconv.ParseUint(c.binary, 2, 64)
re.NoError(err)
selected := pickPeersFromBinaryInt(candidates, uint(binaryNumber))
re.Len(selected, len(c.expectedPeers))
for id := 0; id < len(selected); id++ {
re.Equal(selected[id].Id, c.expectedPeers[id])
}
}
}
2 changes: 1 addition & 1 deletion server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
} else if bs.opTy == movePeer && bs.cur.region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() {
schedulerCounter.WithLabelValues(fmt.Sprintf("hot-region-%s", bs.rwTy), "hot_region_split").Inc()
schedulerCounter.WithLabelValues(bs.sche.GetName(), "need_split_before_move_peer").Inc()
continue
}
bs.cur.mainPeerStat = mainPeerStat
Expand Down
2 changes: 1 addition & 1 deletion server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (t *timestampOracle) generateTSO(count int64, suffixBits int) (physical int
// For example, we have three DCs: dc-1, dc-2 and dc-3. The bits of suffix is defined by
// the const suffixBits. Then, for dc-2, the suffix may be 1 because it's persisted
// in etcd with the value of 1.
// Once we get a noramal TSO like this (18 bits): xxxxxxxxxxxxxxxxxx. We will make the TSO's
// Once we get a normal TSO like this (18 bits): xxxxxxxxxxxxxxxxxx. We will make the TSO's
// low bits of logical part from each DC looks like:
// global: xxxxxxxxxx00000000
// dc-1: xxxxxxxxxx00000001
Expand Down

0 comments on commit 09fd67f

Please sign in to comment.