Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <[email protected]>
  • Loading branch information
5kbpers committed Mar 17, 2020
1 parent 65e47ff commit 9a98ffc
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 35 deletions.
13 changes: 11 additions & 2 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ const (
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash UnexpectedStoreBehavior = 1
// TiFlashOnly caused GetAllTiKVStores to skip the store which is not a
// TiFlash node.
TiFlashOnly UnexpectedStoreBehavior = 2
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
Expand All @@ -122,15 +125,21 @@ func GetAllTiKVStores(
j := 0
skipStore:
for _, store := range stores {
var isTiFlash bool
for _, label := range store.Labels {
if label.Key == "engine" && label.Value == "tiflash" {
if unexpectedStoreBehavior == SkipTiFlash {
continue skipStore
} else if unexpectedStoreBehavior == ErrorOnTiFlash {
return nil, errors.Errorf(
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
return nil, errors.Errorf(
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
isTiFlash = true
}
}
if !isTiFlash && unexpectedStoreBehavior == TiFlashOnly {
continue skipStore
}
stores[j] = store
j++
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,18 @@ func (s *testClientSuite) TestGetAllTiKVStores(c *C) {
unexpectedStoreBehavior: ErrorOnTiFlash,
expectedError: "cannot restore to a cluster with active TiFlash stores.*",
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
{Id: 3},
{Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}},
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
unexpectedStoreBehavior: TiFlashOnly,
expectedStores: map[uint64]int{2: 1, 5: 1},
},
}

for _, testCase := range testCases {
Expand Down
27 changes: 5 additions & 22 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ func (rc *Client) RemoveTiFlashReplica(tables []*utils.Table, placementRules []p
}

for _, table := range tables {
err = rc.db.AlterTiflashReplica(rc.ctx, table, 0)
if err != nil {
return errors.Trace(err)
if table.TiFlashReplicas > 0 {
err = rc.db.AlterTiflashReplica(rc.ctx, table, 0)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
Expand All @@ -396,25 +398,6 @@ func (rc *Client) RecoverTiFlashReplica(tables []*utils.Table) error {
return nil
}

// GetTiFlashStores returns an id list of tiflash stores.
func (rc *Client) GetTiFlashStores() ([]uint64, error) {
stores, err := rc.pdClient.GetAllStores(rc.ctx)
if err != nil {
return nil, err
}

tiflashStores := make([]uint64, 0)
for _, store := range stores {
for _, label := range store.GetLabels() {
if label.GetKey() == "engine" && label.GetValue() == "tiflash" {
tiflashStores = append(tiflashStores, store.GetId())
}
}
}

return tiflashStores, nil
}

// ExecDDLs executes the queries of the ddl jobs.
func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {
// Sort the ddl jobs by schema version in ascending order.
Expand Down
10 changes: 3 additions & 7 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (rs *RegionSplitter) Split(
ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
rejectStores []uint64,
rejectStores map[uint64]bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
Expand Down Expand Up @@ -147,13 +147,9 @@ SplitRegions:
}
if len(rejectStores) > 0 {
startTime = time.Now()
log.Info("start to wait for removing rejected stores", zap.Uint64s("rejectStores", rejectStores))
storeMap := make(map[uint64]bool)
for _, storeID := range rejectStores {
storeMap[storeID] = true
}
log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStores))
for _, region := range allRegions {
if !rs.waitForRemoveRejectStores(ctx, region, storeMap) {
if !rs.waitForRemoveRejectStores(ctx, region, rejectStores) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
return errors.New("waiting for removing rejected stores failed")
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *testRestoreUtilSuite) TestSplit(c *C) {
regionSplitter := NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, []uint64{}, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, map[uint64]bool{}, func(key [][]byte) {})
if err != nil {
c.Assert(err, IsNil, Commentf("split regions failed: %v", err))
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/summary"
)
Expand Down Expand Up @@ -327,12 +328,16 @@ func SplitRanges(
summary.CollectDuration("split region", elapsed)
}()
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
tiflashStores, err := client.GetTiFlashStores()
tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
storeMap := make(map[uint64]bool)
for _, store := range tiflashStores {
storeMap[store.GetId()] = true
}

return splitter.Split(ctx, ranges, rewriteRules, tiflashStores, func(keys [][]byte) {
return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) {
for range keys {
updateCh <- struct{}{}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/utils/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ func GetPlacementRules(pdAddr string, tlsConf *tls.Config) ([]placement.Rule, er
if err != nil {
return nil, errors.Trace(err)
}
if resp.StatusCode == 412 {
return []placement.Rule{}, nil
}
if resp.StatusCode != 200 {
return nil, errors.Errorf("get placement rules failed: resp=%v, err=%v", buf.String(), err)
return nil, errors.Errorf("get placement rules failed: resp=%v, err=%v, code=%d", buf.String(), err, resp.StatusCode)
}
var rules []placement.Rule
err = json.Unmarshal(buf.Bytes(), &rules)
Expand Down

0 comments on commit 9a98ffc

Please sign in to comment.