Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lightning: optimize the estimated size #41943

Merged
merged 18 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions br/pkg/lightning/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ import (
"golang.org/x/exp/maps"
)

// compressionRatio is the tikv/tiflash's compression ratio
const compressionRatio = float64(1) / 3

// EstimateSourceDataSizeResult is the object for estimated data size result.
type EstimateSourceDataSizeResult struct {
// SizeWithIndex is the size with the index.
// SizeWithIndex is the tikv size with the index.
SizeWithIndex int64
// SizeWithoutIndex is the size without the index.
// SizeWithoutIndex is the tikv size without the index.
SizeWithoutIndex int64
// HasUnsortedBigTables indicates whether the source data has unsorted big tables or not.
HasUnsortedBigTables bool
// TiFlashSize is the size of tiflash.
TiFlashSize int64
}

// PreImportInfoGetter defines the operations to get information from sources and target.
Expand Down Expand Up @@ -512,11 +517,16 @@ func (p *PreImportInfoGetterImpl) EstimateSourceDataSize(ctx context.Context, op
if result != nil && !getPreInfoCfg.ForceReloadCache {
return result, nil
}
sizeWithIndex := int64(0)
sourceTotalSize := int64(0)
tableCount := 0
unSortedBigTableCount := 0
errMgr := errormanager.New(nil, p.cfg, log.FromContext(ctx))

var (
sizeWithIndex = int64(0)
tiflashSize = int64(0)
sourceTotalSize = int64(0)
tableCount = 0
unSortedBigTableCount = 0
errMgr = errormanager.New(nil, p.cfg, log.FromContext(ctx))
)

dbInfos, err := p.GetAllTableStructures(ctx)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -528,13 +538,14 @@ func (p *PreImportInfoGetterImpl) EstimateSourceDataSize(ctx context.Context, op
continue
}
for _, tbl := range db.Tables {
// todo(okJiang): sourceTotalSize seems same as
sourceTotalSize += tbl.TotalSize
tableInfo, ok := info.Tables[tbl.Name]
if ok {
tableSize := tbl.TotalSize
// Do not sample small table because there may a large number of small table and it will take a long
// time to sample data for all of them.
if isTiDBBackend(p.cfg) || tbl.TotalSize < int64(config.SplitRegionSize) {
sizeWithIndex += tbl.TotalSize
tbl.IndexRatio = 1.0
tbl.IsRowOrdered = false
} else {
Expand All @@ -545,26 +556,38 @@ func (p *PreImportInfoGetterImpl) EstimateSourceDataSize(ctx context.Context, op
tbl.IndexRatio = sampledIndexRatio
tbl.IsRowOrdered = isRowOrderedFromSample

if tbl.IndexRatio > 0 {
sizeWithIndex += int64(float64(tbl.TotalSize) * tbl.IndexRatio)
} else {
// if sample data failed due to max-error, fallback to use source size
sizeWithIndex += tbl.TotalSize
}
tableSize = int64(float64(tbl.TotalSize) * tbl.IndexRatio)

if tbl.TotalSize > int64(config.DefaultBatchSize)*2 && !tbl.IsRowOrdered {
unSortedBigTableCount++
}
}

sizeWithIndex += tableSize
if tableInfo.Core.TiFlashReplica != nil && tableInfo.Core.TiFlashReplica.Available {
tiflashSize += tableSize * int64(tableInfo.Core.TiFlashReplica.Count)
}
tableCount += 1
}
}
}

replConfig, err := p.targetInfoGetter.GetReplicationConfig(ctx)
if err != nil {
return nil, errors.Trace(err)
}
sizeWithIndex *= int64(replConfig.MaxReplicas)

if isLocalBackend(p.cfg) {
sizeWithIndex = int64(float64(sizeWithIndex) * compressionRatio)
tiflashSize = int64(float64(tiflashSize) * compressionRatio)
}

result = &EstimateSourceDataSizeResult{
SizeWithIndex: sizeWithIndex,
SizeWithoutIndex: sourceTotalSize,
HasUnsortedBigTables: (unSortedBigTableCount > 0),
TiFlashSize: tiflashSize,
}
p.estimatedSizeCache = result
return result, nil
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func TestGetPreInfoEstimateSourceSize(t *testing.T) {
sizeResult, err := ig.EstimateSourceDataSize(ctx)
require.NoError(t, err)
t.Logf("estimate size: %v, file size: %v, has unsorted table: %v\n", sizeResult.SizeWithIndex, sizeResult.SizeWithoutIndex, sizeResult.HasUnsortedBigTables)
require.GreaterOrEqual(t, sizeResult.SizeWithIndex, sizeResult.SizeWithoutIndex)
require.GreaterOrEqual(t, sizeResult.SizeWithIndex, int64(float64(sizeResult.SizeWithoutIndex)*compressionRatio))
require.Equal(t, int64(len(testData)), sizeResult.SizeWithoutIndex)
require.False(t, sizeResult.HasUnsortedBigTables)
}
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ const (
status VARCHAR(32) NOT NULL,
state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish',
source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
cluster_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
tikv_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
tiflash_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
PRIMARY KEY (task_id)
);`
)
Expand Down Expand Up @@ -1975,6 +1976,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
return common.ErrCheckDataSource.Wrap(err).GenWithStackByArgs()
}
estimatedDataSizeWithIndex := estimatedSizeResult.SizeWithIndex
estimatedTiflashDataSize := estimatedSizeResult.TiFlashSize

// Do not import with too large concurrency because these data may be all unsorted.
if estimatedSizeResult.HasUnsortedBigTables {
Expand All @@ -1999,7 +2001,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs()
}
if !taskExist {
if err = rc.taskMgr.InitTask(ctx, estimatedDataSizeWithIndex); err != nil {
if err = rc.taskMgr.InitTask(ctx, estimatedDataSizeWithIndex, estimatedTiflashDataSize); err != nil {
return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs()
}
}
Expand Down
69 changes: 39 additions & 30 deletions br/pkg/lightning/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func RemoveTableMetaByTableName(ctx context.Context, db *sql.DB, metaTable, tabl
}

type taskMetaMgr interface {
InitTask(ctx context.Context, source int64) error
InitTask(ctx context.Context, tikvSourceSize, tiflashSourceSize int64) error
CheckTaskExist(ctx context.Context) (bool, error)
// CheckTasksExclusively check all tasks exclusively. action is the function to check all tasks and returns the tasks
// need to update or any new tasks. There is at most one lightning who can execute the action function at the same time.
Expand Down Expand Up @@ -576,27 +576,29 @@ func parseTaskMetaStatus(s string) (taskMetaStatus, error) {
}

type taskMeta struct {
taskID int64
pdCfgs string
status taskMetaStatus
state int
sourceBytes uint64
clusterAvail uint64
taskID int64
pdCfgs string
status taskMetaStatus
state int
tikvSourceBytes uint64
tiflashSourceBytes uint64
tikvAvail uint64
tiflashAvail uint64
}

type storedCfgs struct {
PauseCfg pdutil.ClusterConfig `json:"paused"`
RestoreCfg pdutil.ClusterConfig `json:"restore"`
}

func (m *dbTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
func (m *dbTaskMetaMgr) InitTask(ctx context.Context, tikvSourceSize, tiflashSourceSize int64) error {
exec := &common.SQLWithRetry{
DB: m.session,
Logger: log.FromContext(ctx),
}
// avoid override existing metadata if the meta is already inserted.
stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, source_bytes) values (?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName)
err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), source, taskStateNormal)
stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, tikv_source_bytes, tiflash_source_bytes) values (?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName)
err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), tikvSourceSize, tiflashSourceSize, taskStateNormal)
return errors.Trace(err)
}

Expand Down Expand Up @@ -655,7 +657,7 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
ctx,
fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName),
fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail from %s FOR UPDATE", m.tableName),
)
if err != nil {
return errors.Annotate(err, "fetch task metas failed")
Expand All @@ -666,7 +668,7 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
for rows.Next() {
var task taskMeta
var statusValue string
if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.sourceBytes, &task.clusterAvail); err != nil {
if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.tikvSourceBytes, &task.tiflashSourceBytes, &task.tikvAvail, &task.tiflashAvail); err != nil {
return errors.Trace(err)
}
status, err := parseTaskMetaStatus(statusValue)
Expand All @@ -685,8 +687,8 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
}
for _, task := range newTasks {
// nolint:gosec
query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName)
if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.sourceBytes, task.clusterAvail); err != nil {
query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName)
if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.tikvSourceBytes, task.tiflashSourceBytes, task.tikvAvail, task.tiflashAvail); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -1007,7 +1009,7 @@ func (b noopMetaMgrBuilder) TableMetaMgr(tr *TableImporter) tableMetaMgr {

type noopTaskMetaMgr struct{}

func (m noopTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
func (m noopTaskMetaMgr) InitTask(ctx context.Context, tikvSourceSize, tiflashSourceSize int64) error {
return nil
}

Expand Down Expand Up @@ -1094,32 +1096,39 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableImporter) tableMetaMgr {
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
taskID int64
initialized bool
sourceBytes uint64
clusterAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
m.sourceBytes = uint64(source)
pd *pdutil.PdController
taskID int64
initialized bool
tikvSourceBytes uint64
tiflashSourceBytes uint64
tikvAvail uint64
tiflashAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, tikvSourceSize, tiflashSourceSize int64) error {
m.tikvSourceBytes = uint64(tikvSourceSize)
m.tiflashSourceBytes = uint64(tiflashSourceSize)
m.initialized = true
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
newTasks, err := action([]taskMeta{
{
taskID: m.taskID,
status: taskMetaStatusInitial,
sourceBytes: m.sourceBytes,
clusterAvail: m.clusterAvail,
taskID: m.taskID,
status: taskMetaStatusInitial,
tikvSourceBytes: m.tikvSourceBytes,
tiflashSourceBytes: m.tiflashSourceBytes,
tikvAvail: m.tikvAvail,
tiflashAvail: m.tiflashAvail,
},
})
for _, t := range newTasks {
if m.taskID == t.taskID {
m.sourceBytes = t.sourceBytes
m.clusterAvail = t.clusterAvail
m.tikvSourceBytes = t.tikvSourceBytes
m.tiflashSourceBytes = t.tiflashSourceBytes
m.tikvAvail = t.tikvAvail
m.tiflashAvail = t.tiflashAvail
}
}
return err
Expand Down
32 changes: 17 additions & 15 deletions br/pkg/lightning/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,19 +381,19 @@ func TestCheckTasksExclusively(t *testing.T) {
s.mockDB.ExpectExec("SET SESSION tidb_txn_mode = 'pessimistic';").
WillReturnResult(sqlmock.NewResult(int64(0), int64(0)))
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from `test`.`t1` FOR UPDATE").
WillReturnRows(sqlmock.NewRows([]string{"task_id", "pd_cfgs", "status", "state", "source_bytes", "cluster_avail"}).
AddRow("0", "", taskMetaStatusInitial.String(), "0", "0", "0").
AddRow("1", "", taskMetaStatusInitial.String(), "0", "0", "0").
AddRow("2", "", taskMetaStatusInitial.String(), "0", "0", "0").
AddRow("3", "", taskMetaStatusInitial.String(), "0", "0", "0").
AddRow("4", "", taskMetaStatusInitial.String(), "0", "0", "0"))

s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
WithArgs(int64(2), "", taskMetaStatusInitial.String(), int(0), uint64(2048), uint64(0)).
s.mockDB.ExpectQuery("SELECT task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail from `test`.`t1` FOR UPDATE").
WillReturnRows(sqlmock.NewRows([]string{"task_id", "pd_cfgs", "status", "state", "tikv_source_bytes", "tiflash_source_bytes", "tiflash_avail", "tiflash_avail"}).
AddRow("0", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0").
AddRow("1", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0").
AddRow("2", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0").
AddRow("3", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0").
AddRow("4", "", taskMetaStatusInitial.String(), "0", "0", "0", "0", "0"))

s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
WithArgs(int64(2), "", taskMetaStatusInitial.String(), int(0), uint64(2048), uint64(2048), uint64(0), uint64(0)).
WillReturnResult(sqlmock.NewResult(0, 1))
s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
WithArgs(int64(3), "", taskMetaStatusInitial.String(), int(0), uint64(3072), uint64(0)).
s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
WithArgs(int64(3), "", taskMetaStatusInitial.String(), int(0), uint64(3072), uint64(3072), uint64(0), uint64(0)).
WillReturnResult(sqlmock.NewResult(0, 1))
s.mockDB.ExpectCommit()

Expand All @@ -409,7 +409,8 @@ func TestCheckTasksExclusively(t *testing.T) {
var newTasks []taskMeta
for j := 2; j < 4; j++ {
task := tasks[j]
task.sourceBytes = uint64(j * 1024)
task.tikvSourceBytes = uint64(j * 1024)
task.tiflashSourceBytes = uint64(j * 1024)
newTasks = append(newTasks, task)
}
return newTasks, nil
Expand Down Expand Up @@ -437,7 +438,7 @@ func TestSingleTaskMetaMgr(t *testing.T) {
require.NoError(t, err)
require.False(t, ok)

err = metaMgr.InitTask(context.Background(), 1<<30)
err = metaMgr.InitTask(context.Background(), 1<<30, 1<<30)
require.NoError(t, err)

ok, err = metaMgr.CheckTaskExist(context.Background())
Expand All @@ -446,7 +447,8 @@ func TestSingleTaskMetaMgr(t *testing.T) {

err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) {
require.Len(t, tasks, 1)
require.Equal(t, uint64(1<<30), tasks[0].sourceBytes)
require.Equal(t, uint64(1<<30), tasks[0].tikvSourceBytes)
require.Equal(t, uint64(1<<30), tasks[0].tiflashSourceBytes)
return nil, nil
})
require.NoError(t, err)
Expand Down
Loading