Skip to content

Commit

Permalink
Use unique rows in copy_state to support parallel replication (#11451)
Browse files Browse the repository at this point in the history
* Use multiple rows in copy_state to support parallel replication

Signed-off-by: Matt Lord <[email protected]>

* Fix test check and use dedicated select for tickers

Signed-off-by: Matt Lord <[email protected]>

* Fixing some unit tests

Signed-off-by: Matt Lord <[email protected]>

* Fix optimizeCopyStateTable

Signed-off-by: Matt Lord <[email protected]>

* Final 🙏 unit test fixes

Signed-off-by: Matt Lord <[email protected]>

* Correct places where we should be getting state for all tables

Signed-off-by: Matt Lord <[email protected]>

* Improve perf of copy state list query

Signed-off-by: Matt Lord <[email protected]>

* Minor changes after self review

Signed-off-by: Matt Lord <[email protected]>

* Do copy state optimization in background goroutine

Signed-off-by: Matt Lord <[email protected]>

* Use default warning log for failed background copy_state optimization work

Signed-off-by: Matt Lord <[email protected]>

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Oct 17, 2022
1 parent 62bb228 commit 927fc3f
Show file tree
Hide file tree
Showing 19 changed files with 185 additions and 72 deletions.
19 changes: 19 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,22 @@ func getShardRoutingRules(t *testing.T) string {
output = strings.TrimSpace(output)
return output
}

func verifyCopyStateIsOptimized(t *testing.T, tablet *cluster.VttabletProcess) {
// Update information_schem with the latest data
_, err := tablet.QueryTablet("analyze table _vt.copy_state", "", false)
require.NoError(t, err)

// Verify that there's no delete marked rows and we reset the auto-inc value
res, err := tablet.QueryTablet("select data_free, auto_increment from information_schema.tables where table_schema='_vt' and table_name='copy_state'",
"", false)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, 1, len(res.Rows))
dataFree, err := res.Rows[0][0].ToInt64()
require.NoError(t, err)
require.Equal(t, int64(0), dataFree, "data_free should be 0")
autoIncrement, err := res.Rows[0][1].ToInt64()
require.NoError(t, err)
require.Equal(t, int64(1), autoIncrement, "auto_increment should be 1")
}
9 changes: 9 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,15 @@ func testBasicVreplicationWorkflow(t *testing.T) {
expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3)
reshardCustomer3to1Merge(t)
expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1)

t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) {
tabletMap := vc.getVttabletsInKeyspace(t, defaultCell, "customer", topodatapb.TabletType_PRIMARY.String())
require.NotNil(t, tabletMap)
require.Greater(t, len(tabletMap), 0)
for _, tablet := range tabletMap {
verifyCopyStateIsOptimized(t, tablet)
}
})
}

func TestV2WorkflowsAcrossDBVersions(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletI
span.Annotate("tablet_alias", tablet.AliasString())
span.Annotate("vrepl_id", id)

query := fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d", id)
query := fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state where vrepl_id = %d group by vrepl_id, table_name)", id, id)
qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/stream_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (sm *StreamMigrator) readSourceStreams(ctx context.Context, cancelMigrate b
return nil
}

query := fmt.Sprintf("select vrepl_id from _vt.copy_state where vrepl_id in %s", VReplicationStreams(tabletStreams).Values())
query := fmt.Sprintf("select distinct vrepl_id from _vt.copy_state where vrepl_id in %s", VReplicationStreams(tabletStreams).Values())
p3qr, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query)
switch {
case err != nil:
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,11 @@ func (wd *workflowDiffer) initVDiffTables(dbClient binlogplayer.DBClient) error
}
}
query := fmt.Sprintf(sqlGetAllTableRows, encodeString(wd.ct.vde.dbName), tableIn.String())
qr, err := dbClient.ExecuteFetch(query, -1)
isqr, err := dbClient.ExecuteFetch(query, -1)
if err != nil {
return err
}
for _, row := range qr.Named().Rows {
for _, row := range isqr.Named().Rows {
tableName, _ := row.ToString("table_name")
tableRows, _ := row.ToInt64("table_rows")

Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ const (
table_name varbinary(128),
lastpk varbinary(2000),
primary key (vrepl_id, table_name))`

alterCopyState = `alter table _vt.copy_state
add column id bigint unsigned not null auto_increment first,
drop primary key, add primary key(id),
add key (vrepl_id, table_name)`
)

var withDDL *withddl.WithDDL
Expand All @@ -77,6 +82,7 @@ func init() {
allddls = append(allddls, binlogplayer.AlterVReplicationTable...)
allddls = append(allddls, createReshardingJournalTable, createCopyState)
allddls = append(allddls, createVReplicationLogTable)
allddls = append(allddls, alterCopyState)
withDDL = withddl.New(allddls)

withDDLInitialQueries = append(withDDLInitialQueries, binlogplayer.WithDDLInitialQueries...)
Expand All @@ -94,6 +100,10 @@ var waitRetryTime = 1 * time.Second
// How frequently vcopier will update _vt.vreplication rows_copied
var rowsCopiedUpdateInterval = 30 * time.Second

// How frequntly vcopier will garbage collect old copy_state rows.
// By default, do it in between every 2nd and 3rd rows copied update.
var copyStateGCInterval = (rowsCopiedUpdateInterval * 3) - (rowsCopiedUpdateInterval / 2)

// Engine is the engine for handling vreplication.
type Engine struct {
// mu synchronizes isOpen, cancelRetry, controllers and wg.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ func TestCreateDBAndTable(t *testing.T) {
"ALTER TABLE _vt.vreplication ADD COLUMN workflow_sub_type int NOT NULL DEFAULT 0",
"create table if not exists _vt.resharding_journal.*",
"create table if not exists _vt.copy_state.*",
"alter table _vt.copy_state.*",
}
for _, ddl := range ddls {
dbClient.ExpectRequestRE(ddl, &sqltypes.Result{}, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestExternalConnectorCopy(t *testing.T) {
expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab1(id,val) values (1,'a'), (2,'b')",
"/update _vt.copy_state",
"/insert into _vt.copy_state",
"commit",
"/delete from _vt.copy_state",
"/update _vt.vreplication set state='Running'",
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestExternalConnectorCopy(t *testing.T) {
expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab2(id,val) values (1,'a'), (2,'b')",
"/update _vt.copy_state",
"/insert into _vt.copy_state",
"commit",
"/delete from _vt.copy_state",
"/update _vt.vreplication set state='Running'",
Expand All @@ -121,7 +121,7 @@ func TestExternalConnectorCopy(t *testing.T) {
expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab3(id,val) values (1,'a'), (2,'b')",
"/update _vt.copy_state",
"/insert into _vt.copy_state",
"commit",
"/delete from _vt.copy_state",
"/update _vt.vreplication set state='Running'",
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ func TestMain(m *testing.M) {
return 1
}

if err := env.Mysqld.ExecuteSuperQuery(context.Background(), alterCopyState); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
}

if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createVReplicationLogTable); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
Expand Down
39 changes: 33 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
// primary key that was copied. A nil Result means that nothing has been copied.
// A table that was fully copied is removed from copyState.
func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSettings) error {
qr, err := vc.vr.dbClient.Execute(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id=%d", vc.vr.id))
qr, err := vc.vr.dbClient.Execute(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state group by vrepl_id, table_name)", vc.vr.id))
if err != nil {
return err
}
Expand Down Expand Up @@ -222,9 +222,11 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma

rowsCopiedTicker := time.NewTicker(rowsCopiedUpdateInterval)
defer rowsCopiedTicker.Stop()
copyStateGCTicker := time.NewTicker(copyStateGCInterval)
defer copyStateGCTicker.Stop()

var pkfields []*querypb.Field
var updateCopyState *sqlparser.ParsedQuery
var addLatestCopyState *sqlparser.ParsedQuery
var bv map[string]*querypb.BindVariable
var sqlbuffer bytes2.Buffer

Expand All @@ -238,6 +240,31 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
return io.EOF
default:
}
select {
case <-copyStateGCTicker.C:
// Garbage collect older copy_state rows:
// - Using a goroutine so that we are not blocking the copy flow
// - Using a new connection so that we do not change the transactional behavior of the copy itself
// This helps to ensure that the table does not grow too large and the
// number of rows does not have a big impact on the queries used for
// the workflow.
go func() {
gcQuery := fmt.Sprintf("delete from _vt.copy_state where vrepl_id = %d and table_name = %s and id < (select maxid from (select max(id) as maxid from _vt.copy_state where vrepl_id = %d and table_name = %s) as depsel)",
vc.vr.id, encodeString(tableName), vc.vr.id, encodeString(tableName))
dbClient := vc.vr.vre.getDBClient(false)
if err := dbClient.Connect(); err != nil {
log.Errorf("Error while garbage collecting older copy_state rows, could not connect to database: %v", err)
return
}
defer dbClient.Close()
if _, err := dbClient.ExecuteFetch(gcQuery, -1); err != nil {
log.Errorf("Error while garbage collecting older copy_state rows with query %q: %v", gcQuery, err)
}
}()
case <-ctx.Done():
return io.EOF
default:
}
if rows.Throttled {
_ = vc.vr.updateTimeThrottled(RowStreamerComponentName)
return nil
Expand Down Expand Up @@ -270,8 +297,8 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
}
pkfields = append(pkfields, rows.Pkfields...)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("update _vt.copy_state set lastpk=%a where vrepl_id=%s and table_name=%s", ":lastpk", strconv.Itoa(int(vc.vr.id)), encodeString(tableName))
updateCopyState = buf.ParsedQuery()
buf.Myprintf("insert into _vt.copy_state (lastpk, vrepl_id, table_name) values (%a, %s, %s)", ":lastpk", strconv.Itoa(int(vc.vr.id)), encodeString(tableName))
addLatestCopyState = buf.ParsedQuery()
}
if len(rows.Rows) == 0 {
return nil
Expand Down Expand Up @@ -315,11 +342,11 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
Value: buf,
},
}
updateState, err := updateCopyState.GenerateQuery(bv, nil)
addNewState, err := addLatestCopyState.GenerateQuery(bv, nil)
if err != nil {
return err
}
if _, err := vc.vr.dbClient.Execute(updateState); err != nil {
if _, err := vc.vr.dbClient.Execute(addNewState); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 927fc3f

Please sign in to comment.