Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDL: Skip collecting TiFlash status when TiFlash is down (#40872) #40887

Open
wants to merge 1 commit into
base: release-6.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,18 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
if err := d.UpdateTiFlashHTTPAddress(&s); err != nil {
}
}
<<<<<<< HEAD
=======

failpoint.Inject("OneTiFlashStoreDown", func() {
for storeID, store := range pollTiFlashContext.TiFlashStores {
store.Store.StateName = "Down"
pollTiFlashContext.TiFlashStores[storeID] = store
break
}
})
pollTiFlashContext.PollCounter++
>>>>>>> c8bffd42c2 (DDL: Skip collecting TiFlash status when TiFlash is down (#40872))

// Start to process every table.
schema := d.GetInfoSchemaWithInterceptor(ctx)
Expand Down
20 changes: 20 additions & 0 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,3 +1049,23 @@ func TestTiFlashAvailableAfterAddPartition(t *testing.T) {
require.NotNil(t, pi)
require.Equal(t, len(pi.Definitions), 2)
}

func TestTiFlashAvailableAfterDownOneStore(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists ddltiflash")
tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown", `return`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown", `return`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown"))
}()

tk.MustExec("alter table ddltiflash set tiflash replica 1")
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
CheckTableAvailable(s.dom, t, 1, []string{})
}
97 changes: 97 additions & 0 deletions domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -71,6 +72,102 @@ func (m *TiFlashPDPlacementManager) Close(ctx context.Context) {

}

<<<<<<< HEAD
=======
func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]helper.StoreStat, tableID int64) (int, error) {
// storeIDs -> regionID, PD will not create two peer on the same store
var flashPeerCount int
for _, store := range tiFlashStores {
regionReplica := make(map[int64]int)
err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, &regionReplica)
failpoint.Inject("OneTiFlashStoreDown", func() {
if store.Store.StateName == "Down" {
err = errors.New("mock TiFlasah down")
}
})
if err != nil {
logutil.BgLogger().Error("Fail to get peer status from TiFlash.",
zap.Int64("tableID", tableID))
// Just skip down or offline or tomestone stores, because PD will migrate regions from these stores.
if store.Store.StateName == "Up" || store.Store.StateName == "Disconnected" {
return 0, err
}
continue
}
flashPeerCount += len(regionReplica)
}
return flashPeerCount, nil
}

// calculateTiFlashProgress calculates progress based on the region status from PD and TiFlash.
func calculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) {
var regionCount int
if err := GetTiFlashRegionCountFromPD(context.Background(), tableID, &regionCount); err != nil {
logutil.BgLogger().Error("Fail to get regionCount from PD.",
zap.Int64("tableID", tableID))
return 0, errors.Trace(err)
}

if regionCount == 0 {
logutil.BgLogger().Warn("region count getting from PD is 0.",
zap.Int64("tableID", tableID))
return 0, fmt.Errorf("region count getting from PD is 0")
}

tiflashPeerCount, err := getTiFlashPeerWithoutLagCount(tiFlashStores, tableID)
if err != nil {
logutil.BgLogger().Error("Fail to get peer count from TiFlash.",
zap.Int64("tableID", tableID))
return 0, errors.Trace(err)
}
progress := float64(tiflashPeerCount) / float64(regionCount*int(replicaCount))
if progress > 1 { // when pd do balance
logutil.BgLogger().Debug("TiFlash peer count > pd peer count, maybe doing balance.",
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
progress = 1
}
if progress < 1 {
logutil.BgLogger().Debug("TiFlash replica progress < 1.",
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
}
return progress, nil
}

// CalculateTiFlashProgress calculates TiFlash replica progress.
func (m *TiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) {
return calculateTiFlashProgress(tableID, replicaCount, tiFlashStores)
}

// UpdateTiFlashProgressCache updates tiflashProgressCache
func (m *TiFlashReplicaManagerCtx) UpdateTiFlashProgressCache(tableID int64, progress float64) {
m.Lock()
defer m.Unlock()
m.tiflashProgressCache[tableID] = progress
}

// GetTiFlashProgressFromCache gets tiflash replica progress from tiflashProgressCache
func (m *TiFlashReplicaManagerCtx) GetTiFlashProgressFromCache(tableID int64) (float64, bool) {
m.RLock()
defer m.RUnlock()
progress, ok := m.tiflashProgressCache[tableID]
return progress, ok
}

// DeleteTiFlashProgressFromCache delete tiflash replica progress from tiflashProgressCache
func (m *TiFlashReplicaManagerCtx) DeleteTiFlashProgressFromCache(tableID int64) {
m.Lock()
defer m.Unlock()
delete(m.tiflashProgressCache, tableID)
}

// CleanTiFlashProgressCache clean progress cache
func (m *TiFlashReplicaManagerCtx) CleanTiFlashProgressCache() {
m.Lock()
defer m.Unlock()
m.tiflashProgressCache = make(map[int64]float64)
}

>>>>>>> c8bffd42c2 (DDL: Skip collecting TiFlash status when TiFlash is down (#40872))
// SetTiFlashGroupConfig sets the tiflash's rule group config
func (m *TiFlashPDPlacementManager) SetTiFlashGroupConfig(ctx context.Context) error {
res, err := doRequest(ctx,
Expand Down