Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lightning: optimize the estimated size #41943

Merged
merged 18 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions br/pkg/lightning/restore/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ import (
"golang.org/x/exp/maps"
)

// compressionRatio is the tikv/tiflash's compression ratio
const compressionRatio = float64(1) / 3
Copy link
Member Author

@okJiang okJiang Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After asked tiflash member, the size in tikv is similar with tiflash for one replica


// EstimateSourceDataSizeResult is the object for estimated data size result.
type EstimateSourceDataSizeResult struct {
// SizeWithIndex is the size with the index.
Expand All @@ -62,6 +65,9 @@ type EstimateSourceDataSizeResult struct {
SizeWithoutIndex int64
// HasUnsortedBigTables indicates whether the source data has unsorted big tables or not.
HasUnsortedBigTables bool
// TiFlashSize is the size of tiflash.
// note: TiFlashSize contains replica of tiflash
TiFlashSize int64
}

// PreRestoreInfoGetter defines the operations to get information from sources and target.
Expand Down Expand Up @@ -512,11 +518,16 @@ func (p *PreRestoreInfoGetterImpl) EstimateSourceDataSize(ctx context.Context, o
if result != nil && !getPreInfoCfg.ForceReloadCache {
return result, nil
}
sizeWithIndex := int64(0)
sourceTotalSize := int64(0)
tableCount := 0
unSortedBigTableCount := 0
errMgr := errormanager.New(nil, p.cfg, log.FromContext(ctx))

var (
sizeWithIndex = int64(0)
tiflashSize = int64(0)
sourceTotalSize = int64(0)
tableCount = 0
unSortedBigTableCount = 0
errMgr = errormanager.New(nil, p.cfg, log.FromContext(ctx))
)

dbInfos, err := p.GetAllTableStructures(ctx)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -531,10 +542,10 @@ func (p *PreRestoreInfoGetterImpl) EstimateSourceDataSize(ctx context.Context, o
sourceTotalSize += tbl.TotalSize
tableInfo, ok := info.Tables[tbl.Name]
if ok {
tableSize := tbl.TotalSize
// Do not sample small table because there may a large number of small table and it will take a long
// time to sample data for all of them.
if isTiDBBackend(p.cfg) || tbl.TotalSize < int64(config.SplitRegionSize) {
sizeWithIndex += tbl.TotalSize
tbl.IndexRatio = 1.0
tbl.IsRowOrdered = false
} else {
Expand All @@ -545,26 +556,35 @@ func (p *PreRestoreInfoGetterImpl) EstimateSourceDataSize(ctx context.Context, o
tbl.IndexRatio = sampledIndexRatio
tbl.IsRowOrdered = isRowOrderedFromSample

// if sample data failed due to max-error, fallback to use source size
okJiang marked this conversation as resolved.
Show resolved Hide resolved
if tbl.IndexRatio > 0 {
sizeWithIndex += int64(float64(tbl.TotalSize) * tbl.IndexRatio)
} else {
// if sample data failed due to max-error, fallback to use source size
sizeWithIndex += tbl.TotalSize
tableSize += int64(float64(tbl.TotalSize) * tbl.IndexRatio)
}

if tbl.TotalSize > int64(config.DefaultBatchSize)*2 && !tbl.IsRowOrdered {
unSortedBigTableCount++
}
}

sizeWithIndex += tableSize
if tableInfo.Core.TiFlashReplica != nil && tableInfo.Core.TiFlashReplica.Available {
tiflashSize += tableSize * int64(tableInfo.Core.TiFlashReplica.Count)
}
tableCount += 1
}
}
}

if isLocalBackend(p.cfg) {
sizeWithIndex = int64(float64(sizeWithIndex) / compressionRatio)
tiflashSize = int64(float64(tiflashSize) / compressionRatio)
}

result = &EstimateSourceDataSizeResult{
SizeWithIndex: sizeWithIndex,
SizeWithoutIndex: sourceTotalSize,
HasUnsortedBigTables: (unSortedBigTableCount > 0),
TiFlashSize: tiflashSize,
}
p.estimatedSizeCache = result
return result, nil
Expand Down
69 changes: 39 additions & 30 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func RemoveTableMetaByTableName(ctx context.Context, db *sql.DB, metaTable, tabl
}

type taskMetaMgr interface {
InitTask(ctx context.Context, source int64) error
InitTask(ctx context.Context, tikvSourceSize, tiflashSourceSize int64) error
CheckTaskExist(ctx context.Context) (bool, error)
// CheckTasksExclusively check all tasks exclusively. action is the function to check all tasks and returns the tasks
// need to update or any new tasks. There is at most one lightning who can execute the action function at the same time.
Expand Down Expand Up @@ -579,27 +579,29 @@ func parseTaskMetaStatus(s string) (taskMetaStatus, error) {
}

type taskMeta struct {
taskID int64
pdCfgs string
status taskMetaStatus
state int
sourceBytes uint64
clusterAvail uint64
taskID int64
pdCfgs string
status taskMetaStatus
state int
tikvSourceBytes uint64
tiflashSourceBytes uint64
tikvAvail uint64
tiflashAvail uint64
}

type storedCfgs struct {
PauseCfg pdutil.ClusterConfig `json:"paused"`
RestoreCfg pdutil.ClusterConfig `json:"restore"`
}

func (m *dbTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
func (m *dbTaskMetaMgr) InitTask(ctx context.Context, tikvSourceSize, tiflashSourceSize int64) error {
exec := &common.SQLWithRetry{
DB: m.session,
Logger: log.FromContext(ctx),
}
// avoid override existing metadata if the meta is already inserted.
stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, source_bytes) values (?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName)
err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), source, taskStateNormal)
stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, tikv_source_bytes, tiflash_source_bytes) values (?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName)
err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), tikvSourceSize, tiflashSourceSize, taskStateNormal)
return errors.Trace(err)
}

Expand Down Expand Up @@ -658,7 +660,7 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
ctx,
fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName),
fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail from %s FOR UPDATE", m.tableName),
)
if err != nil {
return errors.Annotate(err, "fetch task metas failed")
Expand All @@ -669,7 +671,7 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
for rows.Next() {
var task taskMeta
var statusValue string
if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.sourceBytes, &task.clusterAvail); err != nil {
if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.tikvSourceBytes, &task.tiflashSourceBytes, &task.tikvAvail, &task.tiflashAvail); err != nil {
return errors.Trace(err)
}
status, err := parseTaskMetaStatus(statusValue)
Expand All @@ -688,8 +690,8 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
}
for _, task := range newTasks {
// nolint:gosec
query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName)
if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.sourceBytes, task.clusterAvail); err != nil {
query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName)
if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.tikvSourceBytes, task.tiflashSourceBytes, task.tikvAvail, task.tiflashAvail); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -1010,7 +1012,7 @@ func (b noopMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {

type noopTaskMetaMgr struct{}

func (m noopTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
func (m noopTaskMetaMgr) InitTask(ctx context.Context, tikvSourceSize, tiflashSourceSize int64) error {
return nil
}

Expand Down Expand Up @@ -1097,32 +1099,39 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
taskID int64
initialized bool
sourceBytes uint64
clusterAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
m.sourceBytes = uint64(source)
pd *pdutil.PdController
taskID int64
initialized bool
tikvSourceBytes uint64
tiflashSourceBytes uint64
tikvAvail uint64
tiflashAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, tikvSourceSize, tiflashSourceSize int64) error {
m.tikvSourceBytes = uint64(tikvSourceSize)
m.tiflashSourceBytes = uint64(tiflashSourceSize)
m.initialized = true
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
newTasks, err := action([]taskMeta{
{
taskID: m.taskID,
status: taskMetaStatusInitial,
sourceBytes: m.sourceBytes,
clusterAvail: m.clusterAvail,
taskID: m.taskID,
status: taskMetaStatusInitial,
tikvSourceBytes: m.tikvSourceBytes,
tiflashSourceBytes: m.tiflashSourceBytes,
tikvAvail: m.tikvAvail,
tiflashAvail: m.tiflashAvail,
},
})
for _, t := range newTasks {
if m.taskID == t.taskID {
m.sourceBytes = t.sourceBytes
m.clusterAvail = t.clusterAvail
m.tikvSourceBytes = t.tikvSourceBytes
m.tiflashSourceBytes = t.tiflashSourceBytes
m.tikvAvail = t.tikvAvail
m.tiflashAvail = t.tiflashAvail
}
}
return err
Expand Down
32 changes: 17 additions & 15 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,19 +381,19 @@ func TestCheckTasksExclusively(t *testing.T) {
s.mockDB.ExpectExec("SET SESSION tidb_txn_mode = 'pessimistic';").
WillReturnResult(sqlmock.NewResult(int64(0), int64(0)))
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from `test`.`t1` FOR UPDATE").
WillReturnRows(sqlmock.NewRows([]string{"task_id", "pd_cfgs", "status", "state", "source_bytes", "cluster_avail"}).
AddRow("0", "", taskMetaStatusInitial.String(), "0", "0", "0").
AddRow("1", "", taskMetaStatusInitial.String(), "0", "0", "0").
AddRow("2", "", taskMetaStatusInitial.String(), "0", "0", "0").
AddRow("3", "", taskMetaStatusInitial.String(), "0", "0", "0").
AddRow("4", "", taskMetaStatusInitial.String(), "0", "0", "0"))

s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
WithArgs(int64(2), "", taskMetaStatusInitial.String(), int(0), uint64(2048), uint64(0)).
s.mockDB.ExpectQuery("SELECT task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail from `test`.`t1` FOR UPDATE").
WillReturnRows(sqlmock.NewRows([]string{"task_id", "pd_cfgs", "status", "state", "tikv_source_bytes", "tiflash_source_bytes", "tiflash_avail", "tiflash_avail"}).
AddRow("0", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0").
AddRow("1", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0").
AddRow("2", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0").
AddRow("3", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0").
AddRow("4", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0"))

s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
WithArgs(int64(2), "", taskMetaStatusInitial.String(), int(0), uint64(2048), uint64(2048), uint64(0), uint64(0)).
WillReturnResult(sqlmock.NewResult(0, 1))
s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
WithArgs(int64(3), "", taskMetaStatusInitial.String(), int(0), uint64(3072), uint64(0)).
s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
WithArgs(int64(3), "", taskMetaStatusInitial.String(), int(0), uint64(3072), uint64(3072), uint64(0), uint64(0)).
WillReturnResult(sqlmock.NewResult(0, 1))
s.mockDB.ExpectCommit()

Expand All @@ -409,7 +409,8 @@ func TestCheckTasksExclusively(t *testing.T) {
var newTasks []taskMeta
for j := 2; j < 4; j++ {
task := tasks[j]
task.sourceBytes = uint64(j * 1024)
task.tikvSourceBytes = uint64(j * 1024)
task.tiflashSourceBytes = uint64(j * 1024)
newTasks = append(newTasks, task)
}
return newTasks, nil
Expand Down Expand Up @@ -437,7 +438,7 @@ func TestSingleTaskMetaMgr(t *testing.T) {
require.NoError(t, err)
require.False(t, ok)

err = metaMgr.InitTask(context.Background(), 1<<30)
err = metaMgr.InitTask(context.Background(), 1<<30, 1<<30)
require.NoError(t, err)

ok, err = metaMgr.CheckTaskExist(context.Background())
Expand All @@ -446,7 +447,8 @@ func TestSingleTaskMetaMgr(t *testing.T) {

err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) {
require.Len(t, tasks, 1)
require.Equal(t, uint64(1<<30), tasks[0].sourceBytes)
require.Equal(t, uint64(1<<30), tasks[0].tikvSourceBytes)
require.Equal(t, uint64(1<<30), tasks[0].tiflashSourceBytes)
return nil, nil
})
require.NoError(t, err)
Expand Down
Loading