Skip to content

Commit

Permalink
[release-19.0] VReplication: Address SwitchTraffic bugs around replic…
Browse files Browse the repository at this point in the history
…ation lag and cancel on error (#17616) (#17642)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
vitess-bot[bot] and mattlord authored Jan 29, 2025
1 parent 86f2e79 commit c1374d1
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 58 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
tablets := make(map[string]*cluster.VttabletProcess)
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
if tablet.Vttablet.GetTabletStatus() == "SERVING" {
if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) {
log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus())
tablets[tablet.Name] = tablet.Vttablet
}
Expand Down
148 changes: 147 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,16 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchReads(t, workflowType, cellNames, ksWorkflow, false)
assertQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)

switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)

testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
workflow, workflowType)

var commit func(t *testing.T)
if withOpenTx {
commit, _ = vc.startQuery(t, openTxQuery)
}
switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
// Now let's confirm that it works as expected with an error.
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down Expand Up @@ -998,6 +1003,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts))

tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary")
var sourceTablets, targetTablets []*cluster.VttabletProcess

// Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1.
for _, tablet := range tablets {
Expand All @@ -1010,9 +1016,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
targetShards = "," + targetShards + ","
for _, tab := range tablets {
if strings.Contains(targetShards, ","+tab.Shard+",") {
targetTablets = append(targetTablets, tab)
log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
catchup(t, tab, workflow, "Reshard")
} else {
sourceTablets = append(sourceTablets, tab)
log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
continue
}
Expand All @@ -1026,6 +1034,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
if dryRunResultSwitchWrites != nil {
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run")
}
if tableName == "customer" {
testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard")
}
// Now let's confirm that it works as expected with an error.
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary")
reshardAction(t, "Complete", workflow, ksName, "", "", "", "")
for tabletName, count := range counts {
Expand Down Expand Up @@ -1534,6 +1546,140 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
validateDryRunResults(t, output, dryRunResults)
}

// testSwitchWritesErrorHandling confirms that switching writes works as expected
// in the face of vreplication lag (canSwitch() precheck) and when canceling the
// switch due to replication failing to catch up in time.
// The workflow MUST be migrating the customer table from the source to the
// target keyspace AND the workflow must currently have reads switched but not
// writes.
func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) {
t.Run("validate switch writes error handling", func(t *testing.T) {
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
require.NotZero(t, len(sourceTablets), "no source tablets provided")
require.NotZero(t, len(targetTablets), "no target tablets provided")
sourceKs := sourceTablets[0].Keyspace
targetKs := targetTablets[0].Keyspace
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
var err error
sourceConns := make([]*mysql.Conn, len(sourceTablets))
for i, tablet := range sourceTablets {
sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer sourceConns[i].Close()
}
targetConns := make([]*mysql.Conn, len(targetTablets))
for i, tablet := range targetTablets {
targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer targetConns[i].Close()
}
startingTestRowID := 10000000
numTestRows := 100
addTestRows := func() {
for i := 0; i < numTestRows; i++ {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')",
startingTestRowID+i))
}
}
deleteTestRows := func() {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
}
addIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "set session sql_mode=''")
execQuery(t, targetConn, "alter table customer add unique index name_idx (name)")
}
}
dropIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "alter table customer drop index name_idx")
}
}
lockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "lock table customer read")
}
}
unlockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "unlock tables")
}
}
cleanupTestData := func() {
dropIndex()
deleteTestRows()
}
restartWorkflow := func() {
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
}
waitForTargetToCatchup := func() {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForNoWorkflowLag(t, vc, targetKs, workflow)
}

// First let's test that the prechecks work as expected. We ALTER
// the table on the target shards to add a unique index on the name
// field.
addIndex()
// Then we replicate some test rows across the target shards by
// inserting them in the source keyspace.
addTestRows()
// Now the workflow should go into the error state and the lag should
// start to climb. So we sleep for twice the max lag duration that we
// will set for the SwitchTraffic call.
lagDuration := 3 * time.Second
time.Sleep(lagDuration * 3)
out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
// It should fail in the canSwitch() precheck.
require.Error(t, err)
require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
workflow, lagDuration.String()), out)
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
cleanupTestData()
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
restartWorkflow()
waitForTargetToCatchup()

// Now let's test that the cancel works by setting the command timeout
// to a fraction (6s) of the default max repl lag duration (30s). First
// we lock the customer table on the target tablets so that we cannot
// apply the INSERTs and catch up.
lockTargetTable()
addTestRows()
timeout := lagDuration * 2 // 6s
// Use the default max-replication-lag-allowed value of 30s.
// We run the command in a goroutine so that we can unblock things
// after the timeout is reached -- as the vplayer query is blocking
// on the table lock in the MySQL layer.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
}()
time.Sleep(timeout)
// Now we can unblock things and let it continue.
unlockTargetTable()
wg.Wait()
// It should fail due to the command context timeout and we should
// successfully cancel.
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
deleteTestRows()
waitForTargetToCatchup()
})
}

// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
Expand Down
Loading

0 comments on commit c1374d1

Please sign in to comment.