From c7686793f119bbbd23f724e47283ad554e04f540 Mon Sep 17 00:00:00 2001
From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com>
Date: Tue, 28 Jan 2025 14:08:04 -0500
Subject: [PATCH 1/2] Cherry-pick 39a0ddde8f27431f890662d3de8c62fcda530f7e with
 conflicts

---
 go/test/endtoend/vreplication/cluster_test.go |   2 +-
 .../vreplication/vreplication_test.go         | 211 ++++++
 go/vt/vtctl/workflow/server.go                |  48 +-
 go/vt/vtctl/workflow/stream_migrator.go       |  14 +-
 go/vt/vtctl/workflow/switcher.go              |   4 +-
 go/vt/vtctl/workflow/switcher_dry_run.go      |   3 +-
 go/vt/vtctl/workflow/switcher_interface.go    |   5 +
 go/vt/vtctl/workflow/traffic_switcher.go      |  53 +-
 go/vt/vtctl/workflow/workflows.go             | 686 ++++++++++++++++++
 9 files changed, 992 insertions(+), 34 deletions(-)
 create mode 100644 go/vt/vtctl/workflow/workflows.go

diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go
index ddd323f7d3f..f611d76eb62 100644
--- a/go/test/endtoend/vreplication/cluster_test.go
+++ b/go/test/endtoend/vreplication/cluster_test.go
@@ -882,7 +882,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
 	tablets := make(map[string]*cluster.VttabletProcess)
 	for _, shard := range keyspace.Shards {
 		for _, tablet := range shard.Tablets {
-			if tablet.Vttablet.GetTabletStatus() == "SERVING" {
+			if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) {
 				log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus())
 				tablets[tablet.Name] = tablet.Vttablet
 			}
diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go
index c3f3e4e6557..5f580660b6d 100644
--- a/go/test/endtoend/vreplication/vreplication_test.go
+++ b/go/test/endtoend/vreplication/vreplication_test.go
@@ -797,6 +797,19 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
 			commit, _ = vc.startQuery(t, openTxQuery)
 		}
 		switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
+<<<<<<< HEAD
+=======
+		shardNames := make([]string, 0, len(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards))
+		for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards) {
+			shardNames = append(shardNames, shardName)
+		}
+		testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)
+
+		testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
+			workflow, workflowType)
+
+		// Now let's confirm that it works as expected with an error.
+>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
 		switchWrites(t, workflowType, ksWorkflow, false)
 
 		checkThatVDiffFails(t, targetKs, workflow)
@@ -1028,6 +1041,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
 		require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts))
 
 		tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary")
+		var sourceTablets, targetTablets []*cluster.VttabletProcess
 
 		// Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1.
 		for _, tablet := range tablets {
@@ -1040,9 +1054,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
 		targetShards = "," + targetShards + ","
 		for _, tab := range tablets {
 			if strings.Contains(targetShards, ","+tab.Shard+",") {
+				targetTablets = append(targetTablets, tab)
 				log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
 				catchup(t, tab, workflow, "Reshard")
 			} else {
+				sourceTablets = append(sourceTablets, tab)
 				log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
 				continue
 			}
@@ -1056,6 +1072,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
 		if dryRunResultSwitchWrites != nil {
 			reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run")
 		}
+		if tableName == "customer" {
+			testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard")
+		}
+		// Now let's confirm that it works as expected with an error.
 		reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary")
 		reshardAction(t, "Complete", workflow, ksName, "", "", "", "")
 		for tabletName, count := range counts {
@@ -1605,6 +1625,197 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
 	validateDryRunResults(t, output, dryRunResults)
 }
 
+<<<<<<< HEAD
+=======
+// testSwitchTrafficPermissionsChecks confirms that for the SwitchTraffic command, the
+// necessary permissions are checked properly on the source keyspace's primary tablets.
+// This ensures that we can create and manage the reverse vreplication workflow.
+func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) {
+	applyPrivileges := func(query string) {
+		for _, shard := range sourceShards {
+			primary := vc.getPrimaryTablet(t, sourceKeyspace, shard)
+			_, err := primary.QueryTablet(query, primary.Keyspace, false)
+			require.NoError(t, err)
+		}
+	}
+	runDryRunCmd := func(expectErr bool) {
+		_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKeyspace,
+			"SwitchTraffic", "--tablet-types=primary", "--dry-run")
+		require.True(t, ((err != nil) == expectErr), "expected error: %t, got: %v", expectErr, err)
+	}
+
+	defer func() {
+		// Put the default global privs back in place.
+		applyPrivileges("grant select,insert,update,delete on *.* to vt_filtered@localhost")
+	}()
+
+	t.Run("test switch traffic permission checks", func(t *testing.T) {
+		t.Run("test without global privileges", func(t *testing.T) {
+			applyPrivileges("revoke select,insert,update,delete on *.* from vt_filtered@localhost")
+			runDryRunCmd(true)
+		})
+
+		t.Run("test with db level privileges", func(t *testing.T) {
+			applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.* to vt_filtered@localhost",
+				sidecarDBIdentifier))
+			runDryRunCmd(false)
+		})
+
+		t.Run("test without global or db level privileges", func(t *testing.T) {
+			applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.* from vt_filtered@localhost",
+				sidecarDBIdentifier))
+			runDryRunCmd(true)
+		})
+
+		t.Run("test with table level privileges", func(t *testing.T) {
+			applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.vreplication to vt_filtered@localhost",
+				sidecarDBIdentifier))
+			runDryRunCmd(false)
+		})
+
+		t.Run("test without global, db, or table level privileges", func(t *testing.T) {
+			applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.vreplication from vt_filtered@localhost",
+				sidecarDBIdentifier))
+			runDryRunCmd(true)
+		})
+	})
+}
+
+// testSwitchWritesErrorHandling confirms that switching writes works as expected
+// in the face of vreplication lag (canSwitch() precheck) and when canceling the
+// switch due to replication failing to catch up in time.
+// The workflow MUST be migrating the customer table from the source to the
+// target keyspace AND the workflow must currently have reads switched but not
+// writes.
+func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) {
+	t.Run("validate switch writes error handling", func(t *testing.T) {
+		vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
+		defer vtgateConn.Close()
+		require.NotZero(t, len(sourceTablets), "no source tablets provided")
+		require.NotZero(t, len(targetTablets), "no target tablets provided")
+		sourceKs := sourceTablets[0].Keyspace
+		targetKs := targetTablets[0].Keyspace
+		ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
+		var err error
+		sourceConns := make([]*mysql.Conn, len(sourceTablets))
+		for i, tablet := range sourceTablets {
+			sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
+			require.NoError(t, err)
+			defer sourceConns[i].Close()
+		}
+		targetConns := make([]*mysql.Conn, len(targetTablets))
+		for i, tablet := range targetTablets {
+			targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
+			require.NoError(t, err)
+			defer targetConns[i].Close()
+		}
+		startingTestRowID := 10000000
+		numTestRows := 100
+		addTestRows := func() {
+			for i := 0; i < numTestRows; i++ {
+				execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')",
+					startingTestRowID+i))
+			}
+		}
+		deleteTestRows := func() {
+			execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
+		}
+		addIndex := func() {
+			for _, targetConn := range targetConns {
+				execQuery(t, targetConn, "set session sql_mode=''")
+				execQuery(t, targetConn, "alter table customer add unique index name_idx (name)")
+			}
+		}
+		dropIndex := func() {
+			for _, targetConn := range targetConns {
+				execQuery(t, targetConn, "alter table customer drop index name_idx")
+			}
+		}
+		lockTargetTable := func() {
+			for _, targetConn := range targetConns {
+				execQuery(t, targetConn, "lock table customer read")
+			}
+		}
+		unlockTargetTable := func() {
+			for _, targetConn := range targetConns {
+				execQuery(t, targetConn, "unlock tables")
+			}
+		}
+		cleanupTestData := func() {
+			dropIndex()
+			deleteTestRows()
+		}
+		restartWorkflow := func() {
+			err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
+			require.NoError(t, err, "failed to start workflow: %v", err)
+		}
+		waitForTargetToCatchup := func() {
+			waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
+			waitForNoWorkflowLag(t, vc, targetKs, workflow)
+		}
+
+		// First let's test that the prechecks work as expected. We ALTER
+		// the table on the target shards to add a unique index on the name
+		// field.
+		addIndex()
+		// Then we replicate some test rows across the target shards by
+		// inserting them in the source keyspace.
+		addTestRows()
+		// Now the workflow should go into the error state and the lag should
+		// start to climb. So we sleep for twice the max lag duration that we
+		// will set for the SwitchTraffic call.
+		lagDuration := 3 * time.Second
+		time.Sleep(lagDuration * 3)
+		out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
+			"SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
+		// It should fail in the canSwitch() precheck.
+		require.Error(t, err)
+		require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
+			workflow, lagDuration.String()), out)
+		require.NotContains(t, out, "cancel migration failed")
+		// Confirm that queries still work fine.
+		execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
+		cleanupTestData()
+		// We have to restart the workflow again as the duplicate key error
+		// is a permanent/terminal one.
+		restartWorkflow()
+		waitForTargetToCatchup()
+
+		// Now let's test that the cancel works by setting the command timeout
+		// to a fraction (6s) of the default max repl lag duration (30s). First
+		// we lock the customer table on the target tablets so that we cannot
+		// apply the INSERTs and catch up.
+		lockTargetTable()
+		addTestRows()
+		timeout := lagDuration * 2 // 6s
+		// Use the default max-replication-lag-allowed value of 30s.
+		// We run the command in a goroutine so that we can unblock things
+		// after the timeout is reached -- as the vplayer query is blocking
+		// on the table lock in the MySQL layer.
+		wg := sync.WaitGroup{}
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
+				"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
+		}()
+		time.Sleep(timeout)
+		// Now we can unblock things and let it continue.
+		unlockTargetTable()
+		wg.Wait()
+		// It should fail due to the command context timeout and we should
+		// successfully cancel.
+		require.Error(t, err)
+		require.Contains(t, out, "failed to sync up replication between the source and target")
+		require.NotContains(t, out, "cancel migration failed")
+		// Confirm that queries still work fine.
+		execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
+		deleteTestRows()
+		waitForTargetToCatchup()
+	})
+}
+
+>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
 // restartWorkflow confirms that a workflow can be successfully
 // stopped and started.
 func restartWorkflow(t *testing.T, ksWorkflow string) {
diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go
index cd2fb9f9cd3..e64578911af 100644
--- a/go/vt/vtctl/workflow/server.go
+++ b/go/vt/vtctl/workflow/server.go
@@ -3356,8 +3356,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
 			return handleError("failed to migrate the workflow streams", err)
 		}
 		if cancel {
-			sw.cancelMigration(ctx, sm)
-			return 0, sw.logs(), nil
+			if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+				err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+			}
+			return 0, sw.logs(), err
 		}
 
 		// We stop writes on the source before stopping the source streams so that the catchup time
@@ -3369,7 +3371,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
 		// we actually stop them.
 		ts.Logger().Infof("Stopping source writes")
 		if err := sw.stopSourceWrites(ctx); err != nil {
-			sw.cancelMigration(ctx, sm)
+			if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+				err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+			}
 			return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
 		}
 
@@ -3387,7 +3391,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
 					ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
 				}
 			}
-			sw.cancelMigration(ctx, sm)
+			if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+				err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+			}
 			return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
 		}
 
@@ -3397,7 +3403,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
 			// the tablet's deny list check and the first mysqld side table lock.
 			for cnt := 1; cnt <= lockTablesCycles; cnt++ {
 				if err := ts.executeLockTablesOnSource(ctx); err != nil {
-					sw.cancelMigration(ctx, sm)
+					if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+						err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+					}
 					return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err)
 				}
 				// No need to UNLOCK the tables as the connection was closed once the locks were acquired
@@ -3407,26 +3415,39 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
 		}
 
 		ts.Logger().Infof("Waiting for streams to catchup")
+<<<<<<< HEAD
 		if err := sw.waitForCatchup(ctx, timeout); err != nil {
 			sw.cancelMigration(ctx, sm)
+=======
+		if err := sw.waitForCatchup(ctx, waitTimeout); err != nil {
+			if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+				err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+			}
+>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
 			return handleError("failed to sync up replication between the source and target", err)
 		}
 
 		ts.Logger().Infof("Migrating streams")
 		if err := sw.migrateStreams(ctx, sm); err != nil {
-			sw.cancelMigration(ctx, sm)
+			if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+				err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+			}
 			return handleError("failed to migrate the workflow streams", err)
 		}
 
 		ts.Logger().Infof("Resetting sequences")
 		if err := sw.resetSequences(ctx); err != nil {
-			sw.cancelMigration(ctx, sm)
+			if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+				err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+			}
 			return handleError("failed to reset the sequences", err)
 		}
 
 		ts.Logger().Infof("Creating reverse streams")
 		if err := sw.createReverseVReplication(ctx); err != nil {
-			sw.cancelMigration(ctx, sm)
+			if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+				err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+			}
 			return handleError("failed to create the reverse vreplication streams", err)
 		}
 
@@ -3439,7 +3460,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
 			initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
 			defer cancel()
 			if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
-				sw.cancelMigration(ctx, sm)
+				if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+					err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+				}
 				return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
 			}
 		}
@@ -3492,15 +3515,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat
 	if err != nil {
 		return "", err
 	}
+	if wf.MaxVReplicationTransactionLag > maxAllowedReplLagSecs {
+		return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationTransactionLag, maxAllowedReplLagSecs), nil
+	}
 	for _, stream := range wf.ShardStreams {
 		for _, st := range stream.GetStreams() {
 			if st.Message == Frozen {
 				return cannotSwitchFrozen, nil
 			}
-			// If no new events have been replicated after the copy phase then it will be 0.
-			if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs {
-				return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil
-			}
 			switch st.State {
 			case binlogdatapb.VReplicationWorkflowState_Copying.String():
 				return cannotSwitchCopyIncomplete, nil
diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go
index b294ba1fcd0..480728d4936 100644
--- a/go/vt/vtctl/workflow/stream_migrator.go
+++ b/go/vt/vtctl/workflow/stream_migrator.go
@@ -203,12 +203,15 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream {
 }
 
 // CancelStreamMigrations cancels the stream migrations.
-func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
+func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error {
 	if sm.streams == nil {
-		return
+		return nil
 	}
+	errs := &concurrency.AllErrorRecorder{}
 
-	_ = sm.deleteTargetStreams(ctx)
+	if err := sm.deleteTargetStreams(ctx); err != nil {
+		errs.RecordError(fmt.Errorf("could not delete target streams: %v", err))
+	}
 
 	// Restart the source streams, but leave the Reshard workflow's reverse
 	// variant stopped.
@@ -221,8 +224,13 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
 		return err
 	})
 	if err != nil {
+		errs.RecordError(fmt.Errorf("could not restart source streams: %v", err))
 		sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err)
 	}
+	if errs.HasErrors() {
+		return errs.AggrError(vterrors.Aggregate)
+	}
+	return nil
 }
 
 // MigrateStreams migrates N streams
diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go
index aa41655aab8..513a0120611 100644
--- a/go/vt/vtctl/workflow/switcher.go
+++ b/go/vt/vtctl/workflow/switcher.go
@@ -122,8 +122,8 @@ func (r *switcher) stopStreams(ctx context.Context, sm *StreamMigrator) ([]strin
 	return sm.StopStreams(ctx)
 }
 
-func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
-	r.ts.cancelMigration(ctx, sm)
+func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
+	return r.ts.cancelMigration(ctx, sm)
 }
 
 func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) {
diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go
index b8b1369bdf7..82577b46093 100644
--- a/go/vt/vtctl/workflow/switcher_dry_run.go
+++ b/go/vt/vtctl/workflow/switcher_dry_run.go
@@ -289,8 +289,9 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) (
 	return nil, nil
 }
 
-func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) {
+func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
 	dr.drLog.Log("Cancel migration as requested")
+	return nil
 }
 
 func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), error) {
diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go
index 0780aaf484c..9588073ef27 100644
--- a/go/vt/vtctl/workflow/switcher_interface.go
+++ b/go/vt/vtctl/workflow/switcher_interface.go
@@ -24,8 +24,13 @@ import (
 )
 
 type iswitcher interface {
+<<<<<<< HEAD
 	lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error)
 	cancelMigration(ctx context.Context, sm *StreamMigrator)
+=======
+	lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error)
+	cancelMigration(ctx context.Context, sm *StreamMigrator) error
+>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
 	stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
 	stopSourceWrites(ctx context.Context) error
 	waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go
index ecbc7af45f0..956693a20fb 100644
--- a/go/vt/vtctl/workflow/traffic_switcher.go
+++ b/go/vt/vtctl/workflow/traffic_switcher.go
@@ -713,7 +713,7 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri
 
 func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error {
 	if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
-		return ts.switchDeniedTables(ctx)
+		return ts.switchDeniedTables(ctx, false)
 	}
 	return ts.changeShardsAccess(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), allowWrites)
 }
@@ -1012,7 +1012,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati
 func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {
 	var err error
 	if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
-		err = ts.switchDeniedTables(ctx)
+		err = ts.switchDeniedTables(ctx, false)
 	} else {
 		err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites)
 	}
@@ -1033,17 +1033,27 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {
 }
 
 // switchDeniedTables switches the denied tables rules for the traffic switch.
-// They are removed on the source side and added on the target side.
-func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
+// They are added on the source side and removed on the target side.
+// If backward is true, then we swap this logic, removing on the source side
+// and adding on the target side. You would want to do that e.g. when canceling
+// a failed (and currently partial) traffic switch as we may have already
+// switched the denied tables entries and in any event we need to go back to
+// the original state.
+func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error {
 	if ts.MigrationType() != binlogdatapb.MigrationType_TABLES {
 		return nil
 	}
 
+	rmsource, rmtarget := false, true
+	if backward {
+		rmsource, rmtarget = true, false
+	}
+
 	egrp, ectx := errgroup.WithContext(ctx)
 	egrp.Go(func() error {
 		return ts.ForAllSources(func(source *MigrationSource) error {
 			if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error {
-				return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables())
+				return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, rmsource, ts.Tables())
 			}); err != nil {
 				return err
 			}
@@ -1060,7 +1070,7 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
 	egrp.Go(func() error {
 		return ts.ForAllTargets(func(target *MigrationTarget) error {
 			if _, err := ts.TopoServer().UpdateShardFields(ectx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
-				return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables())
+				return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, rmtarget, ts.Tables())
 			}); err != nil {
 				return err
 			}
@@ -1084,8 +1094,9 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
 
 // cancelMigration attempts to revert all changes made during the migration so that we can get back to the
 // state when traffic switching (or reversing) was initiated.
-func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
+func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
 	var err error
+	cancelErrs := &concurrency.AllErrorRecorder{}
 
 	if ctx.Err() != nil {
 		// Even though we create a new context later on we still record any context error:
@@ -1094,21 +1105,29 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
 	}
 
 	// We create a new context while canceling the migration, so that we are independent of the original
-	// context being cancelled prior to or during the cancel operation.
-	cmTimeout := 60 * time.Second
-	cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout)
+	// context being canceled prior to or during the cancel operation itself.
+	// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
+	// canceled by the parent context.
+	wcCtx := context.WithoutCancel(ctx)
+	// Now we create a child context from that which has a timeout.
+	cmTimeout := 2 * time.Minute
+	cmCtx, cmCancel := context.WithTimeout(wcCtx, cmTimeout)
 	defer cmCancel()
 
 	if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
-		err = ts.switchDeniedTables(cmCtx)
+		err = ts.switchDeniedTables(cmCtx, true /* revert */)
 	} else {
 		err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
 	}
 	if err != nil {
+		cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
 		ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
 	}
 
-	sm.CancelStreamMigrations(cmCtx)
+	if err := sm.CancelStreamMigrations(cmCtx); err != nil {
+		cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
+		ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
+	}
 
 	err = ts.ForAllTargets(func(target *MigrationTarget) error {
 		query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
@@ -1117,13 +1136,19 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
 		return err
 	})
 	if err != nil {
+		cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
 		ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
 	}
 
-	err = ts.deleteReverseVReplication(cmCtx)
-	if err != nil {
+	if err := ts.deleteReverseVReplication(cmCtx); err != nil {
+		cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
 		ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
 	}
+
+	if cancelErrs.HasErrors() {
+		return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
+	}
+	return nil
 }
 
 func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error {
diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go
new file mode 100644
index 00000000000..a1b4393f2c0
--- /dev/null
+++ b/go/vt/vtctl/workflow/workflows.go
@@ -0,0 +1,686 @@
+/*
+Copyright 2024 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/*
+This file provides functions for fetching and retrieving information about VReplication workflows
+
+At the moment it is used by the `GetWorkflows` function in `server.go and includes functionality to
+get the following:
+- Fetch workflows by shard
+- Fetch copy states by shard stream
+- Build workflows with metadata
+- Fetch stream logs
+*/
+
+package workflow
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"math"
+	"sort"
+	"strings"
+	"sync"
+	"time"
+
+	"golang.org/x/exp/maps"
+	"golang.org/x/sync/errgroup"
+
+	"vitess.io/vitess/go/sets"
+	"vitess.io/vitess/go/sqltypes"
+	"vitess.io/vitess/go/trace"
+	"vitess.io/vitess/go/vt/binlog/binlogplayer"
+	"vitess.io/vitess/go/vt/logutil"
+	"vitess.io/vitess/go/vt/sqlparser"
+	"vitess.io/vitess/go/vt/topo"
+	"vitess.io/vitess/go/vt/topo/topoproto"
+	"vitess.io/vitess/go/vt/vtctl/workflow/common"
+	"vitess.io/vitess/go/vt/vtctl/workflow/vexec"
+	"vitess.io/vitess/go/vt/vterrors"
+	"vitess.io/vitess/go/vt/vttablet/tmclient"
+
+	binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
+	tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
+	vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
+	vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
+	vttimepb "vitess.io/vitess/go/vt/proto/vttime"
+)
+
+// workflowFetcher is responsible for fetching and retrieving information
+// about VReplication workflows.
+type workflowFetcher struct {
+	ts  *topo.Server
+	tmc tmclient.TabletManagerClient
+
+	logger logutil.Logger
+	parser *sqlparser.Parser
+}
+
+type workflowMetadata struct {
+	sourceKeyspace                string
+	sourceShards                  sets.Set[string]
+	targetKeyspace                string
+	targetShards                  sets.Set[string]
+	maxVReplicationLag            float64
+	maxVReplicationTransactionLag float64
+}
+
+var vrepLogQuery = strings.TrimSpace(`
+SELECT
+	id,
+	vrepl_id,
+	type,
+	state,
+	message,
+	created_at,
+	updated_at,
+	count
+FROM
+	_vt.vreplication_log
+WHERE vrepl_id IN %a
+ORDER BY
+	vrepl_id ASC,
+	id ASC
+`)
+
+func (wf *workflowFetcher) fetchWorkflowsByShard(
+	ctx context.Context,
+	req *vtctldatapb.GetWorkflowsRequest,
+) (map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
+	readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{}
+	if req.Workflow != "" {
+		readReq.IncludeWorkflows = []string{req.Workflow}
+	}
+	if req.ActiveOnly {
+		readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped}
+	}
+
+	m := sync.Mutex{}
+
+	shards, err := common.GetShards(ctx, wf.ts, req.Keyspace, req.Shards)
+	if err != nil {
+		return nil, err
+	}
+
+	results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards))
+
+	err = wf.forAllShards(ctx, req.Keyspace, shards, func(ctx context.Context, si *topo.ShardInfo) error {
+		primary, err := wf.ts.GetTablet(ctx, si.PrimaryAlias)
+		if err != nil {
+			return err
+		}
+		if primary == nil {
+			return fmt.Errorf("%w %s/%s: tablet %v not found", vexec.ErrNoShardPrimary, req.Keyspace, si.ShardName(), topoproto.TabletAliasString(si.PrimaryAlias))
+		}
+		// Clone the request so that we can set the correct DB name for tablet.
+		req := readReq.CloneVT()
+		wres, err := wf.tmc.ReadVReplicationWorkflows(ctx, primary.Tablet, req)
+		if err != nil {
+			return err
+		}
+		m.Lock()
+		defer m.Unlock()
+		results[primary] = wres
+		return nil
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	return results, nil
+}
+
+func (wf *workflowFetcher) fetchCopyStatesByShardStream(
+	ctx context.Context,
+	workflowsByShard map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse,
+) (map[string][]*vtctldatapb.Workflow_Stream_CopyState, error) {
+	m := sync.Mutex{}
+
+	copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(workflowsByShard))
+
+	fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error {
+		span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchCopyStates")
+		defer span.Finish()
+
+		span.Annotate("shard", tablet.Shard)
+		span.Annotate("tablet_alias", tablet.AliasString())
+
+		copyStates, err := wf.getWorkflowCopyStates(ctx, tablet, streamIds)
+		if err != nil {
+			return err
+		}
+
+		m.Lock()
+		defer m.Unlock()
+
+		for _, copyState := range copyStates {
+			shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId)
+			copyStatesByShardStreamId[shardStreamId] = append(
+				copyStatesByShardStreamId[shardStreamId],
+				copyState,
+			)
+		}
+
+		return nil
+	}
+
+	fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx)
+	for tablet, result := range workflowsByShard {
+		streamIds := make([]int32, 0, len(result.Workflows))
+		for _, wf := range result.Workflows {
+			for _, stream := range wf.Streams {
+				streamIds = append(streamIds, stream.Id)
+			}
+		}
+
+		if len(streamIds) == 0 {
+			continue
+		}
+
+		fetchCopyStatesEg.Go(func() error {
+			return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds)
+		})
+	}
+	if err := fetchCopyStatesEg.Wait(); err != nil {
+		return nil, err
+	}
+
+	return copyStatesByShardStreamId, nil
+}
+
+func (wf *workflowFetcher) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
+	span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.getWorkflowCopyStates")
+	defer span.Finish()
+
+	span.Annotate("keyspace", tablet.Keyspace)
+	span.Annotate("shard", tablet.Shard)
+	span.Annotate("tablet_alias", tablet.AliasString())
+	span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds))
+
+	idsBV, err := sqltypes.BuildBindVariable(streamIds)
+	if err != nil {
+		return nil, err
+	}
+	query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)",
+		idsBV, idsBV)
+	if err != nil {
+		return nil, err
+	}
+	qr, err := wf.tmc.VReplicationExec(ctx, tablet.Tablet, query)
+	if err != nil {
+		return nil, err
+	}
+
+	result := sqltypes.Proto3ToResult(qr)
+	if result == nil {
+		return nil, nil
+	}
+
+	copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows))
+	for i, row := range result.Named().Rows {
+		streamId, err := row["vrepl_id"].ToInt64()
+		if err != nil {
+			return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err)
+		}
+		// These string fields are technically varbinary, but this is close enough.
+		copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{
+			StreamId: streamId,
+			Table:    row["table_name"].ToString(),
+			LastPk:   row["lastpk"].ToString(),
+		}
+	}
+
+	return copyStates, nil
+}
+
+func (wf *workflowFetcher) buildWorkflows(
+	ctx context.Context,
+	results map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse,
+	copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState,
+	req *vtctldatapb.GetWorkflowsRequest,
+) ([]*vtctldatapb.Workflow, error) {
+	workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results))
+	workflowMetadataMap := make(map[string]*workflowMetadata, len(results))
+
+	for tablet, result := range results {
+		// In the old implementation, we knew we had at most one (0 <= N <= 1)
+		// workflow for each shard primary we queried. There might be multiple
+		// rows (streams) comprising that workflow, so we would aggregate the
+		// rows for a given primary into a single value ("the workflow",
+		// ReplicationStatusResult in the old types).
+		//
+		// In this version, we have many (N >= 0) workflows for each shard
+		// primary we queried, so we need to determine if each row corresponds
+		// to a workflow we're already aggregating, or if it's a workflow we
+		// haven't seen yet for that shard primary. We use the workflow name to
+		// dedupe for this.
+		for _, wfres := range result.Workflows {
+			workflowName := wfres.Workflow
+			workflow, ok := workflowsMap[workflowName]
+			if !ok {
+				workflow = &vtctldatapb.Workflow{
+					Name:         workflowName,
+					ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{},
+				}
+
+				workflowsMap[workflowName] = workflow
+				workflowMetadataMap[workflowName] = &workflowMetadata{
+					sourceShards: sets.New[string](),
+					targetShards: sets.New[string](),
+				}
+			}
+
+			metadata := workflowMetadataMap[workflowName]
+			err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace)
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+
+	for name, workflow := range workflowsMap {
+		meta := workflowMetadataMap[name]
+		updateWorkflowWithMetadata(workflow, meta)
+
+		// Sort shard streams by stream_id ASC, to support an optimization
+		// in fetchStreamLogs below.
+		for _, shardStreams := range workflow.ShardStreams {
+			sort.Slice(shardStreams.Streams, func(i, j int) bool {
+				return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id
+			})
+		}
+	}
+
+	if req.IncludeLogs {
+		var fetchLogsWG sync.WaitGroup
+
+		for _, workflow := range workflowsMap {
+			// Fetch logs for all streams associated with this workflow in the background.
+			fetchLogsWG.Add(1)
+			go func(ctx context.Context, workflow *vtctldatapb.Workflow) {
+				defer fetchLogsWG.Done()
+				wf.fetchStreamLogs(ctx, req.Keyspace, workflow)
+			}(ctx, workflow)
+		}
+
+		// Wait for all the log fetchers to finish.
+		fetchLogsWG.Wait()
+	}
+
+	return maps.Values(workflowsMap), nil
+}
+
+func (wf *workflowFetcher) scanWorkflow(
+	ctx context.Context,
+	workflow *vtctldatapb.Workflow,
+	res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse,
+	tablet *topo.TabletInfo,
+	meta *workflowMetadata,
+	copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState,
+	keyspace string,
+) error {
+	shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString())
+	shardStream, ok := workflow.ShardStreams[shardStreamKey]
+	if !ok {
+		ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
+		defer cancel()
+
+		si, err := wf.ts.GetShard(ctx, keyspace, tablet.Shard)
+		if err != nil {
+			return err
+		}
+
+		shardStream = &vtctldatapb.Workflow_ShardStream{
+			Streams:          nil,
+			TabletControls:   si.TabletControls,
+			IsPrimaryServing: si.IsPrimaryServing,
+		}
+
+		workflow.ShardStreams[shardStreamKey] = shardStream
+	}
+
+	for _, rstream := range res.Streams {
+		// The value in the pos column can be compressed and thus not
+		// have a valid GTID consisting of valid UTF-8 characters so we
+		// have to decode it so that it's properly decompressed first
+		// when needed.
+		pos := rstream.Pos
+		if pos != "" {
+			mpos, err := binlogplayer.DecodePosition(pos)
+			if err != nil {
+				return err
+			}
+			pos = mpos.String()
+		}
+
+		cells := strings.Split(res.Cells, ",")
+		for i := range cells {
+			cells[i] = strings.TrimSpace(cells[i])
+		}
+		options := res.Options
+		if options != "" {
+			if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil {
+				return err
+			}
+		}
+
+		stream := &vtctldatapb.Workflow_Stream{
+			Id:                        int64(rstream.Id),
+			Shard:                     tablet.Shard,
+			Tablet:                    tablet.Alias,
+			BinlogSource:              rstream.Bls,
+			Position:                  pos,
+			StopPosition:              rstream.StopPos,
+			State:                     rstream.State.String(),
+			DbName:                    tablet.DbName(),
+			TabletTypes:               res.TabletTypes,
+			TabletSelectionPreference: res.TabletSelectionPreference,
+			Cells:                     cells,
+			TransactionTimestamp:      rstream.TransactionTimestamp,
+			TimeUpdated:               rstream.TimeUpdated,
+			Message:                   rstream.Message,
+			Tags:                      strings.Split(res.Tags, ","),
+			RowsCopied:                rstream.RowsCopied,
+			ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{
+				ComponentThrottled: rstream.ComponentThrottled,
+				TimeThrottled:      rstream.TimeThrottled,
+			},
+		}
+
+		// Merge in copy states, which we've already fetched.
+		shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id)
+		if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok {
+			stream.CopyStates = copyStates
+		}
+
+		if rstream.TimeUpdated == nil {
+			rstream.TimeUpdated = &vttimepb.Time{}
+		}
+
+		stream.State = getStreamState(stream, rstream)
+
+		shardStream.Streams = append(shardStream.Streams, stream)
+
+		meta.sourceShards.Insert(stream.BinlogSource.Shard)
+		meta.targetShards.Insert(tablet.Shard)
+
+		if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace {
+			return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace)
+		}
+
+		meta.sourceKeyspace = stream.BinlogSource.Keyspace
+
+		if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace {
+			return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace)
+		}
+
+		meta.targetKeyspace = tablet.Keyspace
+
+		if stream.TimeUpdated == nil {
+			stream.TimeUpdated = &vttimepb.Time{}
+		}
+		timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0)
+		vreplicationLag := time.Since(timeUpdated)
+
+		// MaxVReplicationLag represents the time since we last processed any event
+		// in the workflow.
+		if vreplicationLag.Seconds() > meta.maxVReplicationLag {
+			meta.maxVReplicationLag = vreplicationLag.Seconds()
+		}
+
+		workflow.WorkflowType = res.WorkflowType.String()
+		workflow.WorkflowSubType = res.WorkflowSubType.String()
+		workflow.DeferSecondaryKeys = res.DeferSecondaryKeys
+
+		// MaxVReplicationTransactionLag estimates the max statement processing lag
+		// between the source and the target across all of the workflow streams.
+		transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State)
+		if transactionReplicationLag > meta.maxVReplicationTransactionLag {
+			meta.maxVReplicationTransactionLag = transactionReplicationLag
+		}
+	}
+
+	return nil
+}
+
+func updateWorkflowWithMetadata(workflow *vtctldatapb.Workflow, meta *workflowMetadata) {
+	workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{
+		Keyspace: meta.sourceKeyspace,
+		Shards:   sets.List(meta.sourceShards),
+	}
+
+	workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{
+		Keyspace: meta.targetKeyspace,
+		Shards:   sets.List(meta.targetShards),
+	}
+
+	workflow.MaxVReplicationLag = int64(meta.maxVReplicationLag)
+	workflow.MaxVReplicationTransactionLag = int64(meta.maxVReplicationTransactionLag)
+}
+
+func (wf *workflowFetcher) fetchStreamLogs(ctx context.Context, keyspace string, workflow *vtctldatapb.Workflow) {
+	span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchStreamLogs")
+	defer span.Finish()
+
+	span.Annotate("keyspace", keyspace)
+	span.Annotate("workflow", workflow.Name)
+
+	vreplIDs := make([]int64, 0, len(workflow.ShardStreams))
+	for _, shardStream := range maps.Values(workflow.ShardStreams) {
+		for _, stream := range shardStream.Streams {
+			vreplIDs = append(vreplIDs, stream.Id)
+		}
+	}
+	idsBV, err := sqltypes.BuildBindVariable(vreplIDs)
+	if err != nil {
+		return
+	}
+
+	query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV)
+	if err != nil {
+		return
+	}
+
+	vx := vexec.NewVExec(keyspace, workflow.Name, wf.ts, wf.tmc, wf.parser)
+	results, err := vx.QueryContext(ctx, query)
+	if err != nil {
+		// Note that we do not return here. If there are any query results
+		// in the map (i.e. some tablets returned successfully), we will
+		// still try to read log rows from them on a best-effort basis. But,
+		// we will also pre-emptively record the top-level fetch error on
+		// every stream in every shard in the workflow. Further processing
+		// below may override the error message for certain streams.
+		for _, streams := range workflow.ShardStreams {
+			for _, stream := range streams.Streams {
+				stream.LogFetchError = err.Error()
+			}
+		}
+	}
+
+	for target, p3qr := range results {
+		qr := sqltypes.Proto3ToResult(p3qr)
+		shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString())
+
+		ss, ok := workflow.ShardStreams[shardStreamKey]
+		if !ok || ss == nil {
+			continue
+		}
+
+		streams := ss.Streams
+		streamIdx := 0
+		markErrors := func(err error) {
+			if streamIdx >= len(streams) {
+				return
+			}
+
+			streams[streamIdx].LogFetchError = err.Error()
+		}
+
+		for _, row := range qr.Named().Rows {
+			id, err := row["id"].ToCastInt64()
+			if err != nil {
+				markErrors(err)
+				continue
+			}
+
+			streamID, err := row["vrepl_id"].ToCastInt64()
+			if err != nil {
+				markErrors(err)
+				continue
+			}
+
+			typ := row["type"].ToString()
+			state := row["state"].ToString()
+			message := row["message"].ToString()
+
+			createdAt, err := time.Parse("2006-01-02 15:04:05", row["created_at"].ToString())
+			if err != nil {
+				markErrors(err)
+				continue
+			}
+
+			updatedAt, err := time.Parse("2006-01-02 15:04:05", row["updated_at"].ToString())
+			if err != nil {
+				markErrors(err)
+				continue
+			}
+
+			count, err := row["count"].ToCastInt64()
+			if err != nil {
+				markErrors(err)
+				continue
+			}
+
+			streamLog := &vtctldatapb.Workflow_Stream_Log{
+				Id:       id,
+				StreamId: streamID,
+				Type:     typ,
+				State:    state,
+				CreatedAt: &vttimepb.Time{
+					Seconds: createdAt.Unix(),
+				},
+				UpdatedAt: &vttimepb.Time{
+					Seconds: updatedAt.Unix(),
+				},
+				Message: message,
+				Count:   count,
+			}
+
+			// Earlier, in buildWorkflows, we sorted each ShardStreams
+			// slice by ascending id, and our _vt.vreplication_log query
+			// ordered by (stream_id ASC, id ASC), so we can walk the
+			// streams in index order in O(n) amortized over all the rows
+			// for this tablet.
+			for streamIdx < len(streams) {
+				stream := streams[streamIdx]
+				if stream.Id < streamLog.StreamId {
+					streamIdx++
+					continue
+				}
+
+				if stream.Id > streamLog.StreamId {
+					wf.logger.Warningf("Found stream log for nonexistent stream: %+v", streamLog)
+					// This can happen on manual/failed workflow cleanup so move to the next log.
+					break
+				}
+
+				// stream.Id == streamLog.StreamId
+				stream.Logs = append(stream.Logs, streamLog)
+				break
+			}
+		}
+	}
+}
+
+func (wf *workflowFetcher) forAllShards(
+	ctx context.Context,
+	keyspace string,
+	shards []string,
+	f func(ctx context.Context, shard *topo.ShardInfo) error,
+) error {
+	eg, egCtx := errgroup.WithContext(ctx)
+	for _, shard := range shards {
+		eg.Go(func() error {
+			si, err := wf.ts.GetShard(ctx, keyspace, shard)
+			if err != nil {
+				return err
+			}
+			if si.PrimaryAlias == nil {
+				return fmt.Errorf("%w %s/%s", vexec.ErrNoShardPrimary, keyspace, shard)
+			}
+
+			if err := f(egCtx, si); err != nil {
+				return err
+			}
+			return nil
+		})
+	}
+	if err := eg.Wait(); err != nil {
+		return err
+	}
+	return nil
+}
+
+func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream) string {
+	switch {
+	case strings.Contains(strings.ToLower(stream.Message), "error"):
+		return binlogdatapb.VReplicationWorkflowState_Error.String()
+	case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && len(stream.CopyStates) > 0:
+		return binlogdatapb.VReplicationWorkflowState_Copying.String()
+	case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && int64(time.Now().Second())-rstream.TimeUpdated.Seconds > 10:
+		return binlogdatapb.VReplicationWorkflowState_Lagging.String()
+	}
+	return rstream.State.String()
+}
+
+// getVReplicationTrxLag estimates the actual statement processing lag between the
+// source and the target. If we are still processing source events it is the
+// difference between current time and the timestamp of the last event. If
+// heartbeats are more recent than the last event, then the lag is the time since
+// the last heartbeat as there can be an actual event immediately after the
+// heartbeat, but which has not yet been processed on the target. We don't allow
+// switching during the copy phase, so in that case we just return a large lag.
+// All timestamps are in seconds since epoch.
+func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
+	if state == binlogdatapb.VReplicationWorkflowState_Copying {
+		return math.MaxInt64
+	}
+	if trxTs == nil {
+		trxTs = &vttimepb.Time{}
+	}
+	lastTransactionTime := trxTs.Seconds
+	if updatedTs == nil {
+		updatedTs = &vttimepb.Time{}
+	}
+	lastUpdatedTime := updatedTs.Seconds
+	if heartbeatTs == nil {
+		heartbeatTs = &vttimepb.Time{}
+	}
+	lastHeartbeatTime := heartbeatTs.Seconds
+	// We do NOT update the heartbeat timestamp when we are regularly updating the
+	// position as we replicate transactions (GTIDs).
+	// When we DO record a heartbeat, we set the updated time to the same value.
+	// When recording that we are throttled, we update the updated time but NOT
+	// the heartbeat time.
+	if lastTransactionTime == 0 /* No replicated events after copy */ ||
+		(lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */
+			lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) {
+		lastTransactionTime = lastUpdatedTime
+	}
+	now := time.Now().Unix() // Seconds since epoch
+	return float64(now - lastTransactionTime)
+}

From 1627ef8b7c66140c282d2daef4ee0368c0e9ec92 Mon Sep 17 00:00:00 2001
From: Matt Lord <mattalord@gmail.com>
Date: Tue, 28 Jan 2025 14:37:23 -0500
Subject: [PATCH 2/2] Fix conflicts

Signed-off-by: Matt Lord <mattalord@gmail.com>
---
 .../vreplication/vreplication_test.go         |  74 +-
 go/vt/vtctl/workflow/server.go                |  77 +-
 go/vt/vtctl/workflow/switcher_interface.go    |   5 -
 go/vt/vtctl/workflow/workflows.go             | 686 ------------------
 4 files changed, 48 insertions(+), 794 deletions(-)
 delete mode 100644 go/vt/vtctl/workflow/workflows.go

diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go
index 5f580660b6d..51065a6d7f9 100644
--- a/go/test/endtoend/vreplication/vreplication_test.go
+++ b/go/test/endtoend/vreplication/vreplication_test.go
@@ -792,24 +792,17 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
 		switchReads(t, workflowType, cellNames, ksWorkflow, false)
 		assertQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)
 
-		var commit func(t *testing.T)
-		if withOpenTx {
-			commit, _ = vc.startQuery(t, openTxQuery)
-		}
 		switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
-<<<<<<< HEAD
-=======
-		shardNames := make([]string, 0, len(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards))
-		for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards) {
-			shardNames = append(shardNames, shardName)
-		}
-		testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)
 
 		testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
 			workflow, workflowType)
 
+		var commit func(t *testing.T)
+		if withOpenTx {
+			commit, _ = vc.startQuery(t, openTxQuery)
+		}
+
 		// Now let's confirm that it works as expected with an error.
->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
 		switchWrites(t, workflowType, ksWorkflow, false)
 
 		checkThatVDiffFails(t, targetKs, workflow)
@@ -1625,62 +1618,6 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
 	validateDryRunResults(t, output, dryRunResults)
 }
 
-<<<<<<< HEAD
-=======
-// testSwitchTrafficPermissionsChecks confirms that for the SwitchTraffic command, the
-// necessary permissions are checked properly on the source keyspace's primary tablets.
-// This ensures that we can create and manage the reverse vreplication workflow.
-func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) {
-	applyPrivileges := func(query string) {
-		for _, shard := range sourceShards {
-			primary := vc.getPrimaryTablet(t, sourceKeyspace, shard)
-			_, err := primary.QueryTablet(query, primary.Keyspace, false)
-			require.NoError(t, err)
-		}
-	}
-	runDryRunCmd := func(expectErr bool) {
-		_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKeyspace,
-			"SwitchTraffic", "--tablet-types=primary", "--dry-run")
-		require.True(t, ((err != nil) == expectErr), "expected error: %t, got: %v", expectErr, err)
-	}
-
-	defer func() {
-		// Put the default global privs back in place.
-		applyPrivileges("grant select,insert,update,delete on *.* to vt_filtered@localhost")
-	}()
-
-	t.Run("test switch traffic permission checks", func(t *testing.T) {
-		t.Run("test without global privileges", func(t *testing.T) {
-			applyPrivileges("revoke select,insert,update,delete on *.* from vt_filtered@localhost")
-			runDryRunCmd(true)
-		})
-
-		t.Run("test with db level privileges", func(t *testing.T) {
-			applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.* to vt_filtered@localhost",
-				sidecarDBIdentifier))
-			runDryRunCmd(false)
-		})
-
-		t.Run("test without global or db level privileges", func(t *testing.T) {
-			applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.* from vt_filtered@localhost",
-				sidecarDBIdentifier))
-			runDryRunCmd(true)
-		})
-
-		t.Run("test with table level privileges", func(t *testing.T) {
-			applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.vreplication to vt_filtered@localhost",
-				sidecarDBIdentifier))
-			runDryRunCmd(false)
-		})
-
-		t.Run("test without global, db, or table level privileges", func(t *testing.T) {
-			applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.vreplication from vt_filtered@localhost",
-				sidecarDBIdentifier))
-			runDryRunCmd(true)
-		})
-	})
-}
-
 // testSwitchWritesErrorHandling confirms that switching writes works as expected
 // in the face of vreplication lag (canSwitch() precheck) and when canceling the
 // switch due to replication failing to catch up in time.
@@ -1815,7 +1752,6 @@ func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []
 	})
 }
 
->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
 // restartWorkflow confirms that a workflow can be successfully
 // stopped and started.
 func restartWorkflow(t *testing.T, ksWorkflow string) {
diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go
index e64578911af..69bfdaea079 100644
--- a/go/vt/vtctl/workflow/server.go
+++ b/go/vt/vtctl/workflow/server.go
@@ -638,38 +638,14 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
 			workflow.WorkflowSubType = res.WorkflowSubType.String()
 			workflow.DeferSecondaryKeys = res.DeferSecondaryKeys
 
-			// MaxVReplicationTransactionLag estimates the actual statement processing lag
-			// between the source and the target. If we are still processing source events it
-			// is the difference b/w current time and the timestamp of the last event. If
-			// heartbeats are more recent than the last event, then the lag is the time since
-			// the last heartbeat as there can be an actual event immediately after the
-			// heartbeat, but which has not yet been processed on the target.
-			// We don't allow switching during the copy phase, so in that case we just return
-			// a large lag. All timestamps are in seconds since epoch.
+			// MaxVReplicationTransactionLag estimates the max statement processing lag
+			// between the source and the target across all of the workflow streams.
 			if _, ok := maxVReplicationTransactionLagByWorkflow[workflow.Name]; !ok {
 				maxVReplicationTransactionLagByWorkflow[workflow.Name] = 0
 			}
-			if rstream.TransactionTimestamp == nil {
-				rstream.TransactionTimestamp = &vttimepb.Time{}
-			}
-			lastTransactionTime := rstream.TransactionTimestamp.Seconds
-			if rstream.TimeHeartbeat == nil {
-				rstream.TimeHeartbeat = &vttimepb.Time{}
-			}
-			lastHeartbeatTime := rstream.TimeHeartbeat.Seconds
-			if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() {
-				maxVReplicationTransactionLagByWorkflow[workflow.Name] = math.MaxInt64
-			} else {
-				if lastTransactionTime == 0 /* no new events after copy */ ||
-					lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ {
-
-					lastTransactionTime = lastHeartbeatTime
-				}
-				now := time.Now().Unix() /* seconds since epoch */
-				transactionReplicationLag := float64(now - lastTransactionTime)
-				if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] {
-					maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag
-				}
+			transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State)
+			if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] {
+				maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag
 			}
 		}
 
@@ -3415,15 +3391,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
 		}
 
 		ts.Logger().Infof("Waiting for streams to catchup")
-<<<<<<< HEAD
 		if err := sw.waitForCatchup(ctx, timeout); err != nil {
-			sw.cancelMigration(ctx, sm)
-=======
-		if err := sw.waitForCatchup(ctx, waitTimeout); err != nil {
 			if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
 				err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
 			}
->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
 			return handleError("failed to sync up replication between the source and target", err)
 		}
 
@@ -4072,3 +4043,41 @@ func (s *Server) getWorkflowStatus(ctx context.Context, keyspace string, workflo
 	}
 	return workflowStatus, nil
 }
+
+// getVReplicationTrxLag estimates the actual statement processing lag between the
+// source and the target. If we are still processing source events it is the
+// difference between current time and the timestamp of the last event. If
+// heartbeats are more recent than the last event, then the lag is the time since
+// the last heartbeat as there can be an actual event immediately after the
+// heartbeat, but which has not yet been processed on the target. We don't allow
+// switching during the copy phase, so in that case we just return a large lag.
+// All timestamps are in seconds since epoch.
+func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
+	if state == binlogdatapb.VReplicationWorkflowState_Copying {
+		return math.MaxInt64
+	}
+	if trxTs == nil {
+		trxTs = &vttimepb.Time{}
+	}
+	lastTransactionTime := trxTs.Seconds
+	if updatedTs == nil {
+		updatedTs = &vttimepb.Time{}
+	}
+	lastUpdatedTime := updatedTs.Seconds
+	if heartbeatTs == nil {
+		heartbeatTs = &vttimepb.Time{}
+	}
+	lastHeartbeatTime := heartbeatTs.Seconds
+	// We do NOT update the heartbeat timestamp when we are regularly updating the
+	// position as we replicate transactions (GTIDs).
+	// When we DO record a heartbeat, we set the updated time to the same value.
+	// When recording that we are throttled, we update the updated time but NOT
+	// the heartbeat time.
+	if lastTransactionTime == 0 /* No replicated events after copy */ ||
+		(lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */
+			lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) {
+		lastTransactionTime = lastUpdatedTime
+	}
+	now := time.Now().Unix() // Seconds since epoch
+	return float64(now - lastTransactionTime)
+}
diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go
index 9588073ef27..f5221bf94e1 100644
--- a/go/vt/vtctl/workflow/switcher_interface.go
+++ b/go/vt/vtctl/workflow/switcher_interface.go
@@ -24,13 +24,8 @@ import (
 )
 
 type iswitcher interface {
-<<<<<<< HEAD
 	lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error)
-	cancelMigration(ctx context.Context, sm *StreamMigrator)
-=======
-	lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error)
 	cancelMigration(ctx context.Context, sm *StreamMigrator) error
->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
 	stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
 	stopSourceWrites(ctx context.Context) error
 	waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go
deleted file mode 100644
index a1b4393f2c0..00000000000
--- a/go/vt/vtctl/workflow/workflows.go
+++ /dev/null
@@ -1,686 +0,0 @@
-/*
-Copyright 2024 The Vitess Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-	http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-/*
-This file provides functions for fetching and retrieving information about VReplication workflows
-
-At the moment it is used by the `GetWorkflows` function in `server.go and includes functionality to
-get the following:
-- Fetch workflows by shard
-- Fetch copy states by shard stream
-- Build workflows with metadata
-- Fetch stream logs
-*/
-
-package workflow
-
-import (
-	"context"
-	"encoding/json"
-	"fmt"
-	"math"
-	"sort"
-	"strings"
-	"sync"
-	"time"
-
-	"golang.org/x/exp/maps"
-	"golang.org/x/sync/errgroup"
-
-	"vitess.io/vitess/go/sets"
-	"vitess.io/vitess/go/sqltypes"
-	"vitess.io/vitess/go/trace"
-	"vitess.io/vitess/go/vt/binlog/binlogplayer"
-	"vitess.io/vitess/go/vt/logutil"
-	"vitess.io/vitess/go/vt/sqlparser"
-	"vitess.io/vitess/go/vt/topo"
-	"vitess.io/vitess/go/vt/topo/topoproto"
-	"vitess.io/vitess/go/vt/vtctl/workflow/common"
-	"vitess.io/vitess/go/vt/vtctl/workflow/vexec"
-	"vitess.io/vitess/go/vt/vterrors"
-	"vitess.io/vitess/go/vt/vttablet/tmclient"
-
-	binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
-	tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
-	vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
-	vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
-	vttimepb "vitess.io/vitess/go/vt/proto/vttime"
-)
-
-// workflowFetcher is responsible for fetching and retrieving information
-// about VReplication workflows.
-type workflowFetcher struct {
-	ts  *topo.Server
-	tmc tmclient.TabletManagerClient
-
-	logger logutil.Logger
-	parser *sqlparser.Parser
-}
-
-type workflowMetadata struct {
-	sourceKeyspace                string
-	sourceShards                  sets.Set[string]
-	targetKeyspace                string
-	targetShards                  sets.Set[string]
-	maxVReplicationLag            float64
-	maxVReplicationTransactionLag float64
-}
-
-var vrepLogQuery = strings.TrimSpace(`
-SELECT
-	id,
-	vrepl_id,
-	type,
-	state,
-	message,
-	created_at,
-	updated_at,
-	count
-FROM
-	_vt.vreplication_log
-WHERE vrepl_id IN %a
-ORDER BY
-	vrepl_id ASC,
-	id ASC
-`)
-
-func (wf *workflowFetcher) fetchWorkflowsByShard(
-	ctx context.Context,
-	req *vtctldatapb.GetWorkflowsRequest,
-) (map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
-	readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{}
-	if req.Workflow != "" {
-		readReq.IncludeWorkflows = []string{req.Workflow}
-	}
-	if req.ActiveOnly {
-		readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped}
-	}
-
-	m := sync.Mutex{}
-
-	shards, err := common.GetShards(ctx, wf.ts, req.Keyspace, req.Shards)
-	if err != nil {
-		return nil, err
-	}
-
-	results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards))
-
-	err = wf.forAllShards(ctx, req.Keyspace, shards, func(ctx context.Context, si *topo.ShardInfo) error {
-		primary, err := wf.ts.GetTablet(ctx, si.PrimaryAlias)
-		if err != nil {
-			return err
-		}
-		if primary == nil {
-			return fmt.Errorf("%w %s/%s: tablet %v not found", vexec.ErrNoShardPrimary, req.Keyspace, si.ShardName(), topoproto.TabletAliasString(si.PrimaryAlias))
-		}
-		// Clone the request so that we can set the correct DB name for tablet.
-		req := readReq.CloneVT()
-		wres, err := wf.tmc.ReadVReplicationWorkflows(ctx, primary.Tablet, req)
-		if err != nil {
-			return err
-		}
-		m.Lock()
-		defer m.Unlock()
-		results[primary] = wres
-		return nil
-	})
-	if err != nil {
-		return nil, err
-	}
-
-	return results, nil
-}
-
-func (wf *workflowFetcher) fetchCopyStatesByShardStream(
-	ctx context.Context,
-	workflowsByShard map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse,
-) (map[string][]*vtctldatapb.Workflow_Stream_CopyState, error) {
-	m := sync.Mutex{}
-
-	copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(workflowsByShard))
-
-	fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error {
-		span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchCopyStates")
-		defer span.Finish()
-
-		span.Annotate("shard", tablet.Shard)
-		span.Annotate("tablet_alias", tablet.AliasString())
-
-		copyStates, err := wf.getWorkflowCopyStates(ctx, tablet, streamIds)
-		if err != nil {
-			return err
-		}
-
-		m.Lock()
-		defer m.Unlock()
-
-		for _, copyState := range copyStates {
-			shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId)
-			copyStatesByShardStreamId[shardStreamId] = append(
-				copyStatesByShardStreamId[shardStreamId],
-				copyState,
-			)
-		}
-
-		return nil
-	}
-
-	fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx)
-	for tablet, result := range workflowsByShard {
-		streamIds := make([]int32, 0, len(result.Workflows))
-		for _, wf := range result.Workflows {
-			for _, stream := range wf.Streams {
-				streamIds = append(streamIds, stream.Id)
-			}
-		}
-
-		if len(streamIds) == 0 {
-			continue
-		}
-
-		fetchCopyStatesEg.Go(func() error {
-			return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds)
-		})
-	}
-	if err := fetchCopyStatesEg.Wait(); err != nil {
-		return nil, err
-	}
-
-	return copyStatesByShardStreamId, nil
-}
-
-func (wf *workflowFetcher) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
-	span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.getWorkflowCopyStates")
-	defer span.Finish()
-
-	span.Annotate("keyspace", tablet.Keyspace)
-	span.Annotate("shard", tablet.Shard)
-	span.Annotate("tablet_alias", tablet.AliasString())
-	span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds))
-
-	idsBV, err := sqltypes.BuildBindVariable(streamIds)
-	if err != nil {
-		return nil, err
-	}
-	query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)",
-		idsBV, idsBV)
-	if err != nil {
-		return nil, err
-	}
-	qr, err := wf.tmc.VReplicationExec(ctx, tablet.Tablet, query)
-	if err != nil {
-		return nil, err
-	}
-
-	result := sqltypes.Proto3ToResult(qr)
-	if result == nil {
-		return nil, nil
-	}
-
-	copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows))
-	for i, row := range result.Named().Rows {
-		streamId, err := row["vrepl_id"].ToInt64()
-		if err != nil {
-			return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err)
-		}
-		// These string fields are technically varbinary, but this is close enough.
-		copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{
-			StreamId: streamId,
-			Table:    row["table_name"].ToString(),
-			LastPk:   row["lastpk"].ToString(),
-		}
-	}
-
-	return copyStates, nil
-}
-
-func (wf *workflowFetcher) buildWorkflows(
-	ctx context.Context,
-	results map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse,
-	copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState,
-	req *vtctldatapb.GetWorkflowsRequest,
-) ([]*vtctldatapb.Workflow, error) {
-	workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results))
-	workflowMetadataMap := make(map[string]*workflowMetadata, len(results))
-
-	for tablet, result := range results {
-		// In the old implementation, we knew we had at most one (0 <= N <= 1)
-		// workflow for each shard primary we queried. There might be multiple
-		// rows (streams) comprising that workflow, so we would aggregate the
-		// rows for a given primary into a single value ("the workflow",
-		// ReplicationStatusResult in the old types).
-		//
-		// In this version, we have many (N >= 0) workflows for each shard
-		// primary we queried, so we need to determine if each row corresponds
-		// to a workflow we're already aggregating, or if it's a workflow we
-		// haven't seen yet for that shard primary. We use the workflow name to
-		// dedupe for this.
-		for _, wfres := range result.Workflows {
-			workflowName := wfres.Workflow
-			workflow, ok := workflowsMap[workflowName]
-			if !ok {
-				workflow = &vtctldatapb.Workflow{
-					Name:         workflowName,
-					ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{},
-				}
-
-				workflowsMap[workflowName] = workflow
-				workflowMetadataMap[workflowName] = &workflowMetadata{
-					sourceShards: sets.New[string](),
-					targetShards: sets.New[string](),
-				}
-			}
-
-			metadata := workflowMetadataMap[workflowName]
-			err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace)
-			if err != nil {
-				return nil, err
-			}
-		}
-	}
-
-	for name, workflow := range workflowsMap {
-		meta := workflowMetadataMap[name]
-		updateWorkflowWithMetadata(workflow, meta)
-
-		// Sort shard streams by stream_id ASC, to support an optimization
-		// in fetchStreamLogs below.
-		for _, shardStreams := range workflow.ShardStreams {
-			sort.Slice(shardStreams.Streams, func(i, j int) bool {
-				return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id
-			})
-		}
-	}
-
-	if req.IncludeLogs {
-		var fetchLogsWG sync.WaitGroup
-
-		for _, workflow := range workflowsMap {
-			// Fetch logs for all streams associated with this workflow in the background.
-			fetchLogsWG.Add(1)
-			go func(ctx context.Context, workflow *vtctldatapb.Workflow) {
-				defer fetchLogsWG.Done()
-				wf.fetchStreamLogs(ctx, req.Keyspace, workflow)
-			}(ctx, workflow)
-		}
-
-		// Wait for all the log fetchers to finish.
-		fetchLogsWG.Wait()
-	}
-
-	return maps.Values(workflowsMap), nil
-}
-
-func (wf *workflowFetcher) scanWorkflow(
-	ctx context.Context,
-	workflow *vtctldatapb.Workflow,
-	res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse,
-	tablet *topo.TabletInfo,
-	meta *workflowMetadata,
-	copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState,
-	keyspace string,
-) error {
-	shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString())
-	shardStream, ok := workflow.ShardStreams[shardStreamKey]
-	if !ok {
-		ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
-		defer cancel()
-
-		si, err := wf.ts.GetShard(ctx, keyspace, tablet.Shard)
-		if err != nil {
-			return err
-		}
-
-		shardStream = &vtctldatapb.Workflow_ShardStream{
-			Streams:          nil,
-			TabletControls:   si.TabletControls,
-			IsPrimaryServing: si.IsPrimaryServing,
-		}
-
-		workflow.ShardStreams[shardStreamKey] = shardStream
-	}
-
-	for _, rstream := range res.Streams {
-		// The value in the pos column can be compressed and thus not
-		// have a valid GTID consisting of valid UTF-8 characters so we
-		// have to decode it so that it's properly decompressed first
-		// when needed.
-		pos := rstream.Pos
-		if pos != "" {
-			mpos, err := binlogplayer.DecodePosition(pos)
-			if err != nil {
-				return err
-			}
-			pos = mpos.String()
-		}
-
-		cells := strings.Split(res.Cells, ",")
-		for i := range cells {
-			cells[i] = strings.TrimSpace(cells[i])
-		}
-		options := res.Options
-		if options != "" {
-			if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil {
-				return err
-			}
-		}
-
-		stream := &vtctldatapb.Workflow_Stream{
-			Id:                        int64(rstream.Id),
-			Shard:                     tablet.Shard,
-			Tablet:                    tablet.Alias,
-			BinlogSource:              rstream.Bls,
-			Position:                  pos,
-			StopPosition:              rstream.StopPos,
-			State:                     rstream.State.String(),
-			DbName:                    tablet.DbName(),
-			TabletTypes:               res.TabletTypes,
-			TabletSelectionPreference: res.TabletSelectionPreference,
-			Cells:                     cells,
-			TransactionTimestamp:      rstream.TransactionTimestamp,
-			TimeUpdated:               rstream.TimeUpdated,
-			Message:                   rstream.Message,
-			Tags:                      strings.Split(res.Tags, ","),
-			RowsCopied:                rstream.RowsCopied,
-			ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{
-				ComponentThrottled: rstream.ComponentThrottled,
-				TimeThrottled:      rstream.TimeThrottled,
-			},
-		}
-
-		// Merge in copy states, which we've already fetched.
-		shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id)
-		if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok {
-			stream.CopyStates = copyStates
-		}
-
-		if rstream.TimeUpdated == nil {
-			rstream.TimeUpdated = &vttimepb.Time{}
-		}
-
-		stream.State = getStreamState(stream, rstream)
-
-		shardStream.Streams = append(shardStream.Streams, stream)
-
-		meta.sourceShards.Insert(stream.BinlogSource.Shard)
-		meta.targetShards.Insert(tablet.Shard)
-
-		if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace {
-			return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace)
-		}
-
-		meta.sourceKeyspace = stream.BinlogSource.Keyspace
-
-		if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace {
-			return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace)
-		}
-
-		meta.targetKeyspace = tablet.Keyspace
-
-		if stream.TimeUpdated == nil {
-			stream.TimeUpdated = &vttimepb.Time{}
-		}
-		timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0)
-		vreplicationLag := time.Since(timeUpdated)
-
-		// MaxVReplicationLag represents the time since we last processed any event
-		// in the workflow.
-		if vreplicationLag.Seconds() > meta.maxVReplicationLag {
-			meta.maxVReplicationLag = vreplicationLag.Seconds()
-		}
-
-		workflow.WorkflowType = res.WorkflowType.String()
-		workflow.WorkflowSubType = res.WorkflowSubType.String()
-		workflow.DeferSecondaryKeys = res.DeferSecondaryKeys
-
-		// MaxVReplicationTransactionLag estimates the max statement processing lag
-		// between the source and the target across all of the workflow streams.
-		transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State)
-		if transactionReplicationLag > meta.maxVReplicationTransactionLag {
-			meta.maxVReplicationTransactionLag = transactionReplicationLag
-		}
-	}
-
-	return nil
-}
-
-func updateWorkflowWithMetadata(workflow *vtctldatapb.Workflow, meta *workflowMetadata) {
-	workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{
-		Keyspace: meta.sourceKeyspace,
-		Shards:   sets.List(meta.sourceShards),
-	}
-
-	workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{
-		Keyspace: meta.targetKeyspace,
-		Shards:   sets.List(meta.targetShards),
-	}
-
-	workflow.MaxVReplicationLag = int64(meta.maxVReplicationLag)
-	workflow.MaxVReplicationTransactionLag = int64(meta.maxVReplicationTransactionLag)
-}
-
-func (wf *workflowFetcher) fetchStreamLogs(ctx context.Context, keyspace string, workflow *vtctldatapb.Workflow) {
-	span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchStreamLogs")
-	defer span.Finish()
-
-	span.Annotate("keyspace", keyspace)
-	span.Annotate("workflow", workflow.Name)
-
-	vreplIDs := make([]int64, 0, len(workflow.ShardStreams))
-	for _, shardStream := range maps.Values(workflow.ShardStreams) {
-		for _, stream := range shardStream.Streams {
-			vreplIDs = append(vreplIDs, stream.Id)
-		}
-	}
-	idsBV, err := sqltypes.BuildBindVariable(vreplIDs)
-	if err != nil {
-		return
-	}
-
-	query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV)
-	if err != nil {
-		return
-	}
-
-	vx := vexec.NewVExec(keyspace, workflow.Name, wf.ts, wf.tmc, wf.parser)
-	results, err := vx.QueryContext(ctx, query)
-	if err != nil {
-		// Note that we do not return here. If there are any query results
-		// in the map (i.e. some tablets returned successfully), we will
-		// still try to read log rows from them on a best-effort basis. But,
-		// we will also pre-emptively record the top-level fetch error on
-		// every stream in every shard in the workflow. Further processing
-		// below may override the error message for certain streams.
-		for _, streams := range workflow.ShardStreams {
-			for _, stream := range streams.Streams {
-				stream.LogFetchError = err.Error()
-			}
-		}
-	}
-
-	for target, p3qr := range results {
-		qr := sqltypes.Proto3ToResult(p3qr)
-		shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString())
-
-		ss, ok := workflow.ShardStreams[shardStreamKey]
-		if !ok || ss == nil {
-			continue
-		}
-
-		streams := ss.Streams
-		streamIdx := 0
-		markErrors := func(err error) {
-			if streamIdx >= len(streams) {
-				return
-			}
-
-			streams[streamIdx].LogFetchError = err.Error()
-		}
-
-		for _, row := range qr.Named().Rows {
-			id, err := row["id"].ToCastInt64()
-			if err != nil {
-				markErrors(err)
-				continue
-			}
-
-			streamID, err := row["vrepl_id"].ToCastInt64()
-			if err != nil {
-				markErrors(err)
-				continue
-			}
-
-			typ := row["type"].ToString()
-			state := row["state"].ToString()
-			message := row["message"].ToString()
-
-			createdAt, err := time.Parse("2006-01-02 15:04:05", row["created_at"].ToString())
-			if err != nil {
-				markErrors(err)
-				continue
-			}
-
-			updatedAt, err := time.Parse("2006-01-02 15:04:05", row["updated_at"].ToString())
-			if err != nil {
-				markErrors(err)
-				continue
-			}
-
-			count, err := row["count"].ToCastInt64()
-			if err != nil {
-				markErrors(err)
-				continue
-			}
-
-			streamLog := &vtctldatapb.Workflow_Stream_Log{
-				Id:       id,
-				StreamId: streamID,
-				Type:     typ,
-				State:    state,
-				CreatedAt: &vttimepb.Time{
-					Seconds: createdAt.Unix(),
-				},
-				UpdatedAt: &vttimepb.Time{
-					Seconds: updatedAt.Unix(),
-				},
-				Message: message,
-				Count:   count,
-			}
-
-			// Earlier, in buildWorkflows, we sorted each ShardStreams
-			// slice by ascending id, and our _vt.vreplication_log query
-			// ordered by (stream_id ASC, id ASC), so we can walk the
-			// streams in index order in O(n) amortized over all the rows
-			// for this tablet.
-			for streamIdx < len(streams) {
-				stream := streams[streamIdx]
-				if stream.Id < streamLog.StreamId {
-					streamIdx++
-					continue
-				}
-
-				if stream.Id > streamLog.StreamId {
-					wf.logger.Warningf("Found stream log for nonexistent stream: %+v", streamLog)
-					// This can happen on manual/failed workflow cleanup so move to the next log.
-					break
-				}
-
-				// stream.Id == streamLog.StreamId
-				stream.Logs = append(stream.Logs, streamLog)
-				break
-			}
-		}
-	}
-}
-
-func (wf *workflowFetcher) forAllShards(
-	ctx context.Context,
-	keyspace string,
-	shards []string,
-	f func(ctx context.Context, shard *topo.ShardInfo) error,
-) error {
-	eg, egCtx := errgroup.WithContext(ctx)
-	for _, shard := range shards {
-		eg.Go(func() error {
-			si, err := wf.ts.GetShard(ctx, keyspace, shard)
-			if err != nil {
-				return err
-			}
-			if si.PrimaryAlias == nil {
-				return fmt.Errorf("%w %s/%s", vexec.ErrNoShardPrimary, keyspace, shard)
-			}
-
-			if err := f(egCtx, si); err != nil {
-				return err
-			}
-			return nil
-		})
-	}
-	if err := eg.Wait(); err != nil {
-		return err
-	}
-	return nil
-}
-
-func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream) string {
-	switch {
-	case strings.Contains(strings.ToLower(stream.Message), "error"):
-		return binlogdatapb.VReplicationWorkflowState_Error.String()
-	case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && len(stream.CopyStates) > 0:
-		return binlogdatapb.VReplicationWorkflowState_Copying.String()
-	case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && int64(time.Now().Second())-rstream.TimeUpdated.Seconds > 10:
-		return binlogdatapb.VReplicationWorkflowState_Lagging.String()
-	}
-	return rstream.State.String()
-}
-
-// getVReplicationTrxLag estimates the actual statement processing lag between the
-// source and the target. If we are still processing source events it is the
-// difference between current time and the timestamp of the last event. If
-// heartbeats are more recent than the last event, then the lag is the time since
-// the last heartbeat as there can be an actual event immediately after the
-// heartbeat, but which has not yet been processed on the target. We don't allow
-// switching during the copy phase, so in that case we just return a large lag.
-// All timestamps are in seconds since epoch.
-func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
-	if state == binlogdatapb.VReplicationWorkflowState_Copying {
-		return math.MaxInt64
-	}
-	if trxTs == nil {
-		trxTs = &vttimepb.Time{}
-	}
-	lastTransactionTime := trxTs.Seconds
-	if updatedTs == nil {
-		updatedTs = &vttimepb.Time{}
-	}
-	lastUpdatedTime := updatedTs.Seconds
-	if heartbeatTs == nil {
-		heartbeatTs = &vttimepb.Time{}
-	}
-	lastHeartbeatTime := heartbeatTs.Seconds
-	// We do NOT update the heartbeat timestamp when we are regularly updating the
-	// position as we replicate transactions (GTIDs).
-	// When we DO record a heartbeat, we set the updated time to the same value.
-	// When recording that we are throttled, we update the updated time but NOT
-	// the heartbeat time.
-	if lastTransactionTime == 0 /* No replicated events after copy */ ||
-		(lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */
-			lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) {
-		lastTransactionTime = lastUpdatedTime
-	}
-	now := time.Now().Unix() // Seconds since epoch
-	return float64(now - lastTransactionTime)
-}