Skip to content

Commit

Permalink
VReplication: Address SwitchTraffic bugs around replication lag and c…
Browse files Browse the repository at this point in the history
…ancel on error (#17616)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Jan 28, 2025
1 parent fd1186c commit 39a0ddd
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 65 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
tablets := make(map[string]*cluster.VttabletProcess)
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
if tablet.Vttablet.GetTabletStatus() == "SERVING" {
if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) {
log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus())
tablets[tablet.Name] = tablet.Vttablet
}
Expand Down
146 changes: 146 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,11 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
shardNames = append(shardNames, shardName)
}
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)

testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
workflow, workflowType)

// Now let's confirm that it works as expected with an error.
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down Expand Up @@ -1057,6 +1062,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts))

tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary")
var sourceTablets, targetTablets []*cluster.VttabletProcess

// Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1.
for _, tablet := range tablets {
Expand All @@ -1069,9 +1075,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
targetShards = "," + targetShards + ","
for _, tab := range tablets {
if strings.Contains(targetShards, ","+tab.Shard+",") {
targetTablets = append(targetTablets, tab)
log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
catchup(t, tab, workflow, "Reshard")
} else {
sourceTablets = append(sourceTablets, tab)
log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
continue
}
Expand All @@ -1085,6 +1093,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
if dryRunResultSwitchWrites != nil {
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run")
}
if tableName == "customer" {
testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard")
}
// Now let's confirm that it works as expected with an error.
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary")
reshardAction(t, "Complete", workflow, ksName, "", "", "", "")
for tabletName, count := range counts {
Expand Down Expand Up @@ -1656,6 +1668,140 @@ func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspac
})
}

// testSwitchWritesErrorHandling confirms that switching writes works as expected
// in the face of vreplication lag (canSwitch() precheck) and when canceling the
// switch due to replication failing to catch up in time.
// The workflow MUST be migrating the customer table from the source to the
// target keyspace AND the workflow must currently have reads switched but not
// writes.
func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) {
t.Run("validate switch writes error handling", func(t *testing.T) {
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
require.NotZero(t, len(sourceTablets), "no source tablets provided")
require.NotZero(t, len(targetTablets), "no target tablets provided")
sourceKs := sourceTablets[0].Keyspace
targetKs := targetTablets[0].Keyspace
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
var err error
sourceConns := make([]*mysql.Conn, len(sourceTablets))
for i, tablet := range sourceTablets {
sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer sourceConns[i].Close()
}
targetConns := make([]*mysql.Conn, len(targetTablets))
for i, tablet := range targetTablets {
targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer targetConns[i].Close()
}
startingTestRowID := 10000000
numTestRows := 100
addTestRows := func() {
for i := 0; i < numTestRows; i++ {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')",
startingTestRowID+i))
}
}
deleteTestRows := func() {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
}
addIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "set session sql_mode=''")
execQuery(t, targetConn, "alter table customer add unique index name_idx (name)")
}
}
dropIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "alter table customer drop index name_idx")
}
}
lockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "lock table customer read")
}
}
unlockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "unlock tables")
}
}
cleanupTestData := func() {
dropIndex()
deleteTestRows()
}
restartWorkflow := func() {
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
}
waitForTargetToCatchup := func() {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForNoWorkflowLag(t, vc, targetKs, workflow)
}

// First let's test that the prechecks work as expected. We ALTER
// the table on the target shards to add a unique index on the name
// field.
addIndex()
// Then we replicate some test rows across the target shards by
// inserting them in the source keyspace.
addTestRows()
// Now the workflow should go into the error state and the lag should
// start to climb. So we sleep for twice the max lag duration that we
// will set for the SwitchTraffic call.
lagDuration := 3 * time.Second
time.Sleep(lagDuration * 3)
out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
// It should fail in the canSwitch() precheck.
require.Error(t, err)
require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
workflow, lagDuration.String()), out)
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
cleanupTestData()
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
restartWorkflow()
waitForTargetToCatchup()

// Now let's test that the cancel works by setting the command timeout
// to a fraction (6s) of the default max repl lag duration (30s). First
// we lock the customer table on the target tablets so that we cannot
// apply the INSERTs and catch up.
lockTargetTable()
addTestRows()
timeout := lagDuration * 2 // 6s
// Use the default max-replication-lag-allowed value of 30s.
// We run the command in a goroutine so that we can unblock things
// after the timeout is reached -- as the vplayer query is blocking
// on the table lock in the MySQL layer.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
}()
time.Sleep(timeout)
// Now we can unblock things and let it continue.
unlockTargetTable()
wg.Wait()
// It should fail due to the command context timeout and we should
// successfully cancel.
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
deleteTestRows()
waitForTargetToCatchup()
})
}

// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
Expand Down
45 changes: 31 additions & 14 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2920,8 +2920,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return handleError("failed to migrate the workflow streams", err)
}
if cancel {
sw.cancelMigration(ctx, sm)
return 0, sw.logs(), nil
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return 0, sw.logs(), err
}

// We stop writes on the source before stopping the source streams so that the catchup time
Expand All @@ -2933,7 +2935,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// we actually stop them.
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

Expand All @@ -2951,7 +2955,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}

Expand All @@ -2961,7 +2967,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// the tablet's deny list check and the first mysqld side table lock.
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err)
}
// No need to UNLOCK the tables as the connection was closed once the locks were acquired
Expand All @@ -2982,7 +2990,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, waitTimeout); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to sync up replication between the source and target", err)
}

Expand All @@ -2991,7 +3001,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Migrating streams")
if err := sw.migrateStreams(ctx, sm); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to migrate the workflow streams", err)
}

Expand All @@ -3000,7 +3012,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Resetting sequences")
if err := sw.resetSequences(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to reset the sequences", err)
}

Expand All @@ -3009,7 +3023,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Creating reverse streams")
if err := sw.createReverseVReplication(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to create the reverse vreplication streams", err)
}

Expand All @@ -3024,7 +3040,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
initSeqCtx, cancel := context.WithTimeout(ctx, waitTimeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
Expand Down Expand Up @@ -3074,15 +3092,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR
if err != nil {
return "", err
}
if wf.MaxVReplicationTransactionLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationTransactionLag, maxAllowedReplLagSecs), nil
}
for _, stream := range wf.ShardStreams {
for _, st := range stream.GetStreams() {
if st.Message == Frozen {
return cannotSwitchFrozen, nil
}
// If no new events have been replicated after the copy phase then it will be 0.
if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil
}
switch st.State {
case binlogdatapb.VReplicationWorkflowState_Copying.String():
return cannotSwitchCopyIncomplete, nil
Expand Down
14 changes: 11 additions & 3 deletions go/vt/vtctl/workflow/stream_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,15 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream {
}

// CancelStreamMigrations cancels the stream migrations.
func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error {
if sm.streams == nil {
return
return nil
}
errs := &concurrency.AllErrorRecorder{}

_ = sm.deleteTargetStreams(ctx)
if err := sm.deleteTargetStreams(ctx); err != nil {
errs.RecordError(fmt.Errorf("could not delete target streams: %v", err))
}

// Restart the source streams, but leave the Reshard workflow's reverse
// variant stopped.
Expand All @@ -221,8 +224,13 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
return err
})
if err != nil {
errs.RecordError(fmt.Errorf("could not restart source streams: %v", err))
sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err)
}
if errs.HasErrors() {
return errs.AggrError(vterrors.Aggregate)
}
return nil
}

// MigrateStreams migrates N streams
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (r *switcher) stopStreams(ctx context.Context, sm *StreamMigrator) ([]strin
return sm.StopStreams(ctx)
}

func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
r.ts.cancelMigration(ctx, sm)
func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
return r.ts.cancelMigration(ctx, sm)
}

func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) (
return nil, nil
}

func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) {
func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
dr.drLog.Log("Cancel migration as requested")
return nil
}

func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ ...topo.LockOption) (context.Context, func(*error), error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

type iswitcher interface {
lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error)
cancelMigration(ctx context.Context, sm *StreamMigrator)
cancelMigration(ctx context.Context, sm *StreamMigrator) error
stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
stopSourceWrites(ctx context.Context) error
waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
Expand Down
Loading

0 comments on commit 39a0ddd

Please sign in to comment.