Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Optimize mysql consistency #121

Merged
merged 9 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dumpling/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/dumpling/v4/cli"
"github.com/pingcap/dumpling/v4/export"
"github.com/pingcap/log"
"github.com/pingcap/dumpling/v4/log"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/spf13/pflag"
"go.uber.org/zap"
Expand Down
36 changes: 36 additions & 0 deletions v4/export/connectionsPool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package export

import (
"context"
"database/sql"
)

type connectionsPool struct {
conns chan *sql.Conn
}

func newConnectionsPool(ctx context.Context, n int, pool *sql.DB) (*connectionsPool, error) {
connectPool := &connectionsPool{
conns: make(chan *sql.Conn, n),
}
for i := 0; i < n; i++ {
conn, err := createConnWithConsistency(ctx, pool)
if err != nil {
return nil, err
}
connectPool.releaseConn(conn)
}
return connectPool, nil
}

func (r *connectionsPool) getConn() *sql.Conn {
return <-r.conns
}

func (r *connectionsPool) releaseConn(conn *sql.Conn) {
select {
case r.conns <- conn:
default:
panic("put a redundant conn")
}
}
128 changes: 82 additions & 46 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}

if conf.Snapshot == "" && (doPdGC || conf.Consistency == "snapshot") {
conf.Snapshot, err = getSnapshot(pool)
conn, err := pool.Conn(ctx)
if err != nil {
return withStack(err)
}
conf.Snapshot, err = getSnapshot(conn)
if err != nil {
return err
}
conn.Close()
}

if conf.Snapshot != "" {
Expand Down Expand Up @@ -100,9 +105,10 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
"After dumping: run sql `update mysql.tidb set VARIABLE_VALUE = '10m' where VARIABLE_NAME = 'tikv_gc_life_time';` in tidb.\n")
}

pool, err = resetDBWithSessionParams(pool, conf.getDSN(""), conf.SessionParams)
if err != nil {
return err
if newPool, err := resetDBWithSessionParams(pool, conf.getDSN(""), conf.SessionParams); err != nil {
return withStack(err)
} else {
pool = newPool
}

m := newGlobalMetadata(conf.OutputDirPath)
Expand All @@ -112,14 +118,19 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
// for consistency lock, we should lock tables at first to get the tables we want to lock & dump
// for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables
if conf.Consistency == "lock" {
conn, err := createConnWithConsistency(ctx, pool)
if err != nil {
return err
}
m.recordStartTime(time.Now())
err = m.recordGlobalMetaData(pool, conf.ServerInfo.ServerType)
err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
}
if err = prepareTableListToDump(conf, pool); err != nil {
if err = prepareTableListToDump(conf, conn); err != nil {
return err
}
conn.Close()
}

conCtrl, err := NewConsistencyController(conf, pool)
Expand All @@ -130,17 +141,28 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return err
}

connectPool, err := newConnectionsPool(ctx, conf.Threads, pool)
if err != nil {
return err
}

if err = conCtrl.TearDown(); err != nil {
return err
}

// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached
// for other consistencies, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot.
if conf.Consistency != "lock" {
m.recordStartTime(time.Now())
err = m.recordGlobalMetaData(pool, conf.ServerInfo.ServerType)
conn := connectPool.getConn()
err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
}
if err = prepareTableListToDump(conf, pool); err != nil {
if err = prepareTableListToDump(conf, conn); err != nil {
return err
}
connectPool.releaseConn(conn)
}

var writer Writer
Expand All @@ -155,24 +177,26 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}

if conf.Sql == "" {
if err = dumpDatabases(ctx, conf, pool, writer); err != nil {
if err = dumpDatabases(ctx, conf, connectPool, writer); err != nil {
return err
}
} else {
if err = dumpSql(ctx, conf, pool, writer); err != nil {
if err = dumpSql(ctx, conf, connectPool, writer); err != nil {
return err
}
}

m.recordFinishTime(time.Now())

return conCtrl.TearDown()
return nil
}

func dumpDatabases(ctx context.Context, conf *Config, db *sql.DB, writer Writer) error {
func dumpDatabases(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error {
allTables := conf.Tables
var g errgroup.Group
for dbName, tables := range allTables {
createDatabaseSQL, err := ShowCreateDatabase(db, dbName)
conn := connectPool.getConn()
createDatabaseSQL, err := ShowCreateDatabase(conn, dbName)
connectPool.releaseConn(conn)
if err != nil {
return err
}
Expand All @@ -183,24 +207,35 @@ func dumpDatabases(ctx context.Context, conf *Config, db *sql.DB, writer Writer)
if len(tables) == 0 {
continue
}
rateLimit := newRateLimit(conf.Threads)
var g errgroup.Group
for _, table := range tables {
table := table
g.Go(func() error {
rateLimit.getToken()
defer rateLimit.putToken()
return dumpTable(ctx, conf, db, dbName, table, writer)
})
}
if err := g.Wait(); err != nil {
return err
conn := connectPool.getConn()
tableDataIRArray, err := dumpTable(ctx, conf, conn, dbName, table, writer)
if err != nil {
return err
}
connectPool.releaseConn(conn)
for _, tableIR := range tableDataIRArray {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer connectPool.releaseConn(conn)
err := tableIR.Start(ctx, conn)
if err != nil {
return err
}
return writer.WriteTableData(ctx, tableIR)
})
}
}
}
if err := g.Wait(); err != nil {
return err
}
return nil
}

func prepareTableListToDump(conf *Config, pool *sql.DB) error {
func prepareTableListToDump(conf *Config, pool *sql.Conn) error {
databases, err := prepareDumpingDatabases(conf, pool)
if err != nil {
return err
Expand All @@ -223,54 +258,56 @@ func prepareTableListToDump(conf *Config, pool *sql.DB) error {
return nil
}

func dumpSql(ctx context.Context, conf *Config, db *sql.DB, writer Writer) error {
tableIR, err := SelectFromSql(conf, db)
func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error {
conn := connectPool.getConn()
tableIR, err := SelectFromSql(conf, conn)
connectPool.releaseConn(conn)
if err != nil {
return err
}

return writer.WriteTableData(ctx, tableIR)
}

func dumpTable(ctx context.Context, conf *Config, db *sql.DB, dbName string, table *TableInfo, writer Writer) error {
func dumpTable(ctx context.Context, conf *Config, db *sql.Conn, dbName string, table *TableInfo, writer Writer) ([]TableDataIR, error) {
tableName := table.Name
if !conf.NoSchemas {
if table.Type == TableTypeView {
viewName := table.Name
createViewSQL, err := ShowCreateView(db, dbName, viewName)
if err != nil {
return err
return nil, err
}
return writer.WriteTableMeta(ctx, dbName, viewName, createViewSQL)
return nil, writer.WriteTableMeta(ctx, dbName, viewName, createViewSQL)
}
createTableSQL, err := ShowCreateTable(db, dbName, tableName)
if err != nil {
return err
return nil, err
}
if err := writer.WriteTableMeta(ctx, dbName, tableName, createTableSQL); err != nil {
return err
return nil, err
}
}
// Do not dump table data and return nil
if conf.NoData {
return nil
return nil, nil
}

if conf.Rows != UnspecifiedSize {
finished, err := concurrentDumpTable(ctx, writer, conf, db, dbName, tableName)
finished, chunksIterArray, err := concurrentDumpTable(ctx, conf, db, dbName, tableName)
if err != nil || finished {
return err
return chunksIterArray, err
}
}
tableIR, err := SelectAllFromTable(conf, db, dbName, tableName)
if err != nil {
return err
return nil, err
}

return writer.WriteTableData(ctx, tableIR)
return []TableDataIR{tableIR}, nil
}

func concurrentDumpTable(ctx context.Context, writer Writer, conf *Config, db *sql.DB, dbName string, tableName string) (bool, error) {
func concurrentDumpTable(ctx context.Context, conf *Config, db *sql.Conn, dbName string, tableName string) (bool, []TableDataIR, error) {
// try dump table concurrently by split table to chunks
chunksIterCh := make(chan TableDataIR, defaultDumpThreads)
errCh := make(chan error, defaultDumpThreads)
Expand All @@ -279,6 +316,7 @@ func concurrentDumpTable(ctx context.Context, writer Writer, conf *Config, db *s
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
var g errgroup.Group
chunksIterArray := make([]TableDataIR, 0)
g.Go(func() error {
splitTableDataIntoChunks(ctx1, chunksIterCh, errCh, linear, dbName, tableName, db, conf)
return nil
Expand All @@ -288,24 +326,22 @@ Loop:
for {
select {
case <-ctx.Done():
return true, nil
return true, chunksIterArray, nil
case <-linear:
return false, nil
return false, chunksIterArray, nil
case chunksIter, ok := <-chunksIterCh:
if !ok {
break Loop
}
g.Go(func() error {
return writer.WriteTableData(ctx, chunksIter)
})
chunksIterArray = append(chunksIterArray, chunksIter)
case err := <-errCh:
return false, err
return false, chunksIterArray, err
}
}
if err := g.Wait(); err != nil {
return true, err
return true, chunksIterArray, err
}
return true, nil
return true, chunksIterArray, nil
}

func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, snapshotTS uint64) {
Expand Down
30 changes: 27 additions & 3 deletions v4/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package export

import (
"context"
"database/sql"
"fmt"
"strconv"

Expand All @@ -27,6 +28,14 @@ func newMockWriter() *mockWriter {
}
}

func newMockConnectPool(c *C, db *sql.DB) *connectionsPool {
conn, err := db.Conn(context.Background())
c.Assert(err, IsNil)
connectPool := &connectionsPool{conns: make(chan *sql.Conn, 1)}
connectPool.releaseConn(conn)
return connectPool
}

func (m *mockWriter) WriteDatabaseMeta(ctx context.Context, db, createSQL string) error {
m.databaseMeta[db] = createSQL
return nil
Expand Down Expand Up @@ -64,7 +73,8 @@ func (s *testDumpSuite) TestDumpDatabase(c *C) {
mock.ExpectQuery("SELECT (.) FROM `test`.`t`").WillReturnRows(rows)

mockWriter := newMockWriter()
err = dumpDatabases(context.Background(), mockConfig, db, mockWriter)
connectPool := newMockConnectPool(c, db)
err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter)
c.Assert(err, IsNil)

c.Assert(len(mockWriter.databaseMeta), Equals, 1)
Expand All @@ -78,6 +88,8 @@ func (s *testDumpSuite) TestDumpTable(c *C) {
mockConfig.SortByPk = false
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
conn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

showCreateTableResult := "CREATE TABLE t (a INT)"
rows := mock.NewRows([]string{"Table", "Create Table"}).AddRow("t", showCreateTableResult)
Expand All @@ -90,8 +102,13 @@ func (s *testDumpSuite) TestDumpTable(c *C) {
mock.ExpectQuery("SELECT (.) FROM `test`.`t`").WillReturnRows(rows)

mockWriter := newMockWriter()
err = dumpTable(context.Background(), mockConfig, db, "test", &TableInfo{Name: "t"}, mockWriter)
ctx := context.Background()
tableIRArray, err := dumpTable(ctx, mockConfig, conn, "test", &TableInfo{Name: "t"}, mockWriter)
c.Assert(err, IsNil)
for _, tableIR := range tableIRArray {
c.Assert(tableIR.Start(ctx, conn), IsNil)
c.Assert(mockWriter.WriteTableData(ctx, tableIR), IsNil)
}

c.Assert(mockWriter.tableMeta["test.t"], Equals, showCreateTableResult)
c.Assert(len(mockWriter.tableData), Equals, 1)
Expand Down Expand Up @@ -121,6 +138,8 @@ func (s *testDumpSuite) TestDumpTableWhereClause(c *C) {
mockConfig.Where = "a > 3 and a < 9"
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
conn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

showCreateTableResult := "CREATE TABLE t (a INT)"
rows := mock.NewRows([]string{"Table", "Create Table"}).AddRow("t", showCreateTableResult)
Expand All @@ -137,8 +156,13 @@ func (s *testDumpSuite) TestDumpTableWhereClause(c *C) {
mock.ExpectQuery("SELECT (.) FROM `test`.`t` WHERE a > 3 and a < 9").WillReturnRows(rows)

mockWriter := newMockWriter()
err = dumpTable(context.Background(), mockConfig, db, "test", &TableInfo{Name: "t"}, mockWriter)
ctx := context.Background()
tableIRArray, err := dumpTable(ctx, mockConfig, conn, "test", &TableInfo{Name: "t"}, mockWriter)
c.Assert(err, IsNil)
for _, tableIR := range tableIRArray {
c.Assert(tableIR.Start(ctx, conn), IsNil)
c.Assert(mockWriter.WriteTableData(ctx, tableIR), IsNil)
}

c.Assert(mockWriter.tableMeta["test.t"], Equals, showCreateTableResult)
c.Assert(len(mockWriter.tableData), Equals, 1)
Expand Down
Loading