Skip to content

Commit

Permalink
Fix conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 29, 2025
1 parent c768679 commit 1627ef8
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 794 deletions.
74 changes: 5 additions & 69 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,24 +792,17 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchReads(t, workflowType, cellNames, ksWorkflow, false)
assertQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)

var commit func(t *testing.T)
if withOpenTx {
commit, _ = vc.startQuery(t, openTxQuery)
}
switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
<<<<<<< HEAD
=======
shardNames := make([]string, 0, len(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards))
for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards) {
shardNames = append(shardNames, shardName)
}
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)

testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
workflow, workflowType)

var commit func(t *testing.T)
if withOpenTx {
commit, _ = vc.startQuery(t, openTxQuery)
}

// Now let's confirm that it works as expected with an error.
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down Expand Up @@ -1625,62 +1618,6 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
validateDryRunResults(t, output, dryRunResults)
}

<<<<<<< HEAD
=======
// testSwitchTrafficPermissionsChecks confirms that for the SwitchTraffic command, the
// necessary permissions are checked properly on the source keyspace's primary tablets.
// This ensures that we can create and manage the reverse vreplication workflow.
func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) {
applyPrivileges := func(query string) {
for _, shard := range sourceShards {
primary := vc.getPrimaryTablet(t, sourceKeyspace, shard)
_, err := primary.QueryTablet(query, primary.Keyspace, false)
require.NoError(t, err)
}
}
runDryRunCmd := func(expectErr bool) {
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKeyspace,
"SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.True(t, ((err != nil) == expectErr), "expected error: %t, got: %v", expectErr, err)
}

defer func() {
// Put the default global privs back in place.
applyPrivileges("grant select,insert,update,delete on *.* to vt_filtered@localhost")
}()

t.Run("test switch traffic permission checks", func(t *testing.T) {
t.Run("test without global privileges", func(t *testing.T) {
applyPrivileges("revoke select,insert,update,delete on *.* from vt_filtered@localhost")
runDryRunCmd(true)
})

t.Run("test with db level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.* to vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(false)
})

t.Run("test without global or db level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.* from vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(true)
})

t.Run("test with table level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.vreplication to vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(false)
})

t.Run("test without global, db, or table level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.vreplication from vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(true)
})
})
}

// testSwitchWritesErrorHandling confirms that switching writes works as expected
// in the face of vreplication lag (canSwitch() precheck) and when canceling the
// switch due to replication failing to catch up in time.
Expand Down Expand Up @@ -1815,7 +1752,6 @@ func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []
})
}

>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
Expand Down
77 changes: 43 additions & 34 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,38 +638,14 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
workflow.WorkflowSubType = res.WorkflowSubType.String()
workflow.DeferSecondaryKeys = res.DeferSecondaryKeys

// MaxVReplicationTransactionLag estimates the actual statement processing lag
// between the source and the target. If we are still processing source events it
// is the difference b/w current time and the timestamp of the last event. If
// heartbeats are more recent than the last event, then the lag is the time since
// the last heartbeat as there can be an actual event immediately after the
// heartbeat, but which has not yet been processed on the target.
// We don't allow switching during the copy phase, so in that case we just return
// a large lag. All timestamps are in seconds since epoch.
// MaxVReplicationTransactionLag estimates the max statement processing lag
// between the source and the target across all of the workflow streams.
if _, ok := maxVReplicationTransactionLagByWorkflow[workflow.Name]; !ok {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = 0
}
if rstream.TransactionTimestamp == nil {
rstream.TransactionTimestamp = &vttimepb.Time{}
}
lastTransactionTime := rstream.TransactionTimestamp.Seconds
if rstream.TimeHeartbeat == nil {
rstream.TimeHeartbeat = &vttimepb.Time{}
}
lastHeartbeatTime := rstream.TimeHeartbeat.Seconds
if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = math.MaxInt64
} else {
if lastTransactionTime == 0 /* no new events after copy */ ||
lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ {

lastTransactionTime = lastHeartbeatTime
}
now := time.Now().Unix() /* seconds since epoch */
transactionReplicationLag := float64(now - lastTransactionTime)
if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag
}
transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State)
if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag
}
}

Expand Down Expand Up @@ -3415,15 +3391,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}

ts.Logger().Infof("Waiting for streams to catchup")
<<<<<<< HEAD
if err := sw.waitForCatchup(ctx, timeout); err != nil {
sw.cancelMigration(ctx, sm)
=======
if err := sw.waitForCatchup(ctx, waitTimeout); err != nil {
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
return handleError("failed to sync up replication between the source and target", err)
}

Expand Down Expand Up @@ -4072,3 +4043,41 @@ func (s *Server) getWorkflowStatus(ctx context.Context, keyspace string, workflo
}
return workflowStatus, nil
}

// getVReplicationTrxLag estimates the actual statement processing lag between the
// source and the target. If we are still processing source events it is the
// difference between current time and the timestamp of the last event. If
// heartbeats are more recent than the last event, then the lag is the time since
// the last heartbeat as there can be an actual event immediately after the
// heartbeat, but which has not yet been processed on the target. We don't allow
// switching during the copy phase, so in that case we just return a large lag.
// All timestamps are in seconds since epoch.
func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
if state == binlogdatapb.VReplicationWorkflowState_Copying {
return math.MaxInt64
}
if trxTs == nil {
trxTs = &vttimepb.Time{}
}
lastTransactionTime := trxTs.Seconds
if updatedTs == nil {
updatedTs = &vttimepb.Time{}
}
lastUpdatedTime := updatedTs.Seconds
if heartbeatTs == nil {
heartbeatTs = &vttimepb.Time{}
}
lastHeartbeatTime := heartbeatTs.Seconds
// We do NOT update the heartbeat timestamp when we are regularly updating the
// position as we replicate transactions (GTIDs).
// When we DO record a heartbeat, we set the updated time to the same value.
// When recording that we are throttled, we update the updated time but NOT
// the heartbeat time.
if lastTransactionTime == 0 /* No replicated events after copy */ ||
(lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */
lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) {
lastTransactionTime = lastUpdatedTime
}
now := time.Now().Unix() // Seconds since epoch
return float64(now - lastTransactionTime)
}
5 changes: 0 additions & 5 deletions go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ import (
)

type iswitcher interface {
<<<<<<< HEAD
lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error)
cancelMigration(ctx context.Context, sm *StreamMigrator)
=======
lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error)
cancelMigration(ctx context.Context, sm *StreamMigrator) error
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
stopSourceWrites(ctx context.Context) error
waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
Expand Down
Loading

0 comments on commit 1627ef8

Please sign in to comment.