Skip to content

Commit

Permalink
Fix conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 29, 2025
1 parent 756598d commit 3266f98
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 904 deletions.
73 changes: 4 additions & 69 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,24 +762,16 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchReads(t, workflowType, cellNames, ksWorkflow, false)
assertQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)

var commit func(t *testing.T)
if withOpenTx {
commit, _ = vc.startQuery(t, openTxQuery)
}
switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
<<<<<<< HEAD
=======
shardNames := make([]string, 0, len(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards))
for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards) {
shardNames = append(shardNames, shardName)
}
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)

testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
workflow, workflowType)

var commit func(t *testing.T)
if withOpenTx {
commit, _ = vc.startQuery(t, openTxQuery)
}
// Now let's confirm that it works as expected with an error.
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down Expand Up @@ -1554,62 +1546,6 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
validateDryRunResults(t, output, dryRunResults)
}

<<<<<<< HEAD
=======
// testSwitchTrafficPermissionsChecks confirms that for the SwitchTraffic command, the
// necessary permissions are checked properly on the source keyspace's primary tablets.
// This ensures that we can create and manage the reverse vreplication workflow.
func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) {
applyPrivileges := func(query string) {
for _, shard := range sourceShards {
primary := vc.getPrimaryTablet(t, sourceKeyspace, shard)
_, err := primary.QueryTablet(query, primary.Keyspace, false)
require.NoError(t, err)
}
}
runDryRunCmd := func(expectErr bool) {
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKeyspace,
"SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.True(t, ((err != nil) == expectErr), "expected error: %t, got: %v", expectErr, err)
}

defer func() {
// Put the default global privs back in place.
applyPrivileges("grant select,insert,update,delete on *.* to vt_filtered@localhost")
}()

t.Run("test switch traffic permission checks", func(t *testing.T) {
t.Run("test without global privileges", func(t *testing.T) {
applyPrivileges("revoke select,insert,update,delete on *.* from vt_filtered@localhost")
runDryRunCmd(true)
})

t.Run("test with db level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.* to vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(false)
})

t.Run("test without global or db level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.* from vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(true)
})

t.Run("test with table level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.vreplication to vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(false)
})

t.Run("test without global, db, or table level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.vreplication from vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(true)
})
})
}

// testSwitchWritesErrorHandling confirms that switching writes works as expected
// in the face of vreplication lag (canSwitch() precheck) and when canceling the
// switch due to replication failing to catch up in time.
Expand Down Expand Up @@ -1744,7 +1680,6 @@ func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []
})
}

>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
Expand Down
111 changes: 54 additions & 57 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,10 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows

targetKeyspaceByWorkflow[workflow.Name] = tablet.Keyspace

timeUpdated := time.Unix(timeUpdatedSeconds, 0)
vreplicationLag := time.Since(timeUpdated)

// MaxVReplicationLag represents the time since we last processed any event
// in the workflow.
timeUpdated := time.Unix(timeUpdatedSeconds, 0)
vreplicationLag := time.Since(timeUpdated)
if currentMaxLag, ok := maxVReplicationLagByWorkflow[workflow.Name]; ok {
if vreplicationLag.Seconds() > currentMaxLag {
maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds()
Expand All @@ -701,32 +700,18 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds()
}

// MaxVReplicationTransactionLag estimates the actual statement processing lag
// between the source and the target. If we are still processing source events it
// is the difference b/w current time and the timestamp of the last event. If
// heartbeats are more recent than the last event, then the lag is the time since
// the last heartbeat as there can be an actual event immediately after the
// heartbeat, but which has not yet been processed on the target.
// We don't allow switching during the copy phase, so in that case we just return
// a large lag. All timestamps are in seconds since epoch.
// MaxVReplicationTransactionLag estimates the max statement processing lag
// between the source and the target across all of the workflow streams.
if _, ok := maxVReplicationTransactionLagByWorkflow[workflow.Name]; !ok {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = 0
}
lastTransactionTime := transactionTimeSeconds
lastHeartbeatTime := timeHeartbeat
if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = math.MaxInt64
} else {
if lastTransactionTime == 0 /* no new events after copy */ ||
lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ {

lastTransactionTime = lastHeartbeatTime
}
now := time.Now().Unix() /* seconds since epoch */
transactionReplicationLag := float64(now - lastTransactionTime)
if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag
}
heartbeatTimestamp := &vttimepb.Time{
Seconds: timeHeartbeat,
}
transactionReplicationLag := getVReplicationTrxLag(stream.TransactionTimestamp, stream.TimeUpdated, heartbeatTimestamp,
binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[stream.State]))
if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag
}

return nil
Expand Down Expand Up @@ -3248,24 +3233,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return 0, sw.logs(), err

Check warning on line 3233 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3230-L3233

Added lines #L3230 - L3233 were not covered by tests
}

<<<<<<< HEAD
=======
// We stop writes on the source before stopping the source streams so that the catchup time
// is lessened and other workflows that we have to migrate such as intra-keyspace materialize
// workflows also have a chance to catch up as well because those are internally generated
// GTIDs within the shards we're switching traffic away from.
// For intra-keyspace materialization streams that we migrate where the source and target are
// the keyspace being resharded, we wait for those to catchup in the stopStreams path before
// we actually stop them.
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
ts.Logger().Infof("Stopping streams")
sourceWorkflows, err = sw.stopStreams(ctx, sm)
if err != nil {
Expand All @@ -3274,21 +3241,18 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
<<<<<<< HEAD
sw.cancelMigration(ctx, sm)
return handleError("failed to stop the workflow streams", err)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)

Check warning on line 3247 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3244-L3247

Added lines #L3244 - L3247 were not covered by tests
}

ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
=======
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}

Check warning on line 3254 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3252-L3254

Added lines #L3252 - L3254 were not covered by tests
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
Expand All @@ -3309,15 +3273,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}

ts.Logger().Infof("Waiting for streams to catchup")
<<<<<<< HEAD
if err := sw.waitForCatchup(ctx, timeout); err != nil {
sw.cancelMigration(ctx, sm)
=======
if err := sw.waitForCatchup(ctx, waitTimeout); err != nil {
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}

Check warning on line 3279 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3277-L3279

Added lines #L3277 - L3279 were not covered by tests
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
return handleError("failed to sync up replication between the source and target", err)
}

Expand Down Expand Up @@ -3942,3 +3901,41 @@ func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCrea
}
return s.moveTablesCreate(ctx, moveTablesCreateRequest, binlogdatapb.VReplicationWorkflowType_Migrate)
}

// getVReplicationTrxLag estimates the actual statement processing lag between the
// source and the target. If we are still processing source events it is the
// difference between current time and the timestamp of the last event. If
// heartbeats are more recent than the last event, then the lag is the time since
// the last heartbeat as there can be an actual event immediately after the
// heartbeat, but which has not yet been processed on the target. We don't allow
// switching during the copy phase, so in that case we just return a large lag.
// All timestamps are in seconds since epoch.
func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
if state == binlogdatapb.VReplicationWorkflowState_Copying {
return math.MaxInt64
}

Check warning on line 3916 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3915-L3916

Added lines #L3915 - L3916 were not covered by tests
if trxTs == nil {
trxTs = &vttimepb.Time{}
}

Check warning on line 3919 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3918-L3919

Added lines #L3918 - L3919 were not covered by tests
lastTransactionTime := trxTs.Seconds
if updatedTs == nil {
updatedTs = &vttimepb.Time{}
}

Check warning on line 3923 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3922-L3923

Added lines #L3922 - L3923 were not covered by tests
lastUpdatedTime := updatedTs.Seconds
if heartbeatTs == nil {
heartbeatTs = &vttimepb.Time{}
}

Check warning on line 3927 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3926-L3927

Added lines #L3926 - L3927 were not covered by tests
lastHeartbeatTime := heartbeatTs.Seconds
// We do NOT update the heartbeat timestamp when we are regularly updating the
// position as we replicate transactions (GTIDs).
// When we DO record a heartbeat, we set the updated time to the same value.
// When recording that we are throttled, we update the updated time but NOT
// the heartbeat time.
if lastTransactionTime == 0 /* No replicated events after copy */ ||
(lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */
lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) {
lastTransactionTime = lastUpdatedTime
}
now := time.Now().Unix() // Seconds since epoch
return float64(now - lastTransactionTime)
}
5 changes: 0 additions & 5 deletions go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ import (
)

type iswitcher interface {
<<<<<<< HEAD
lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error)
cancelMigration(ctx context.Context, sm *StreamMigrator)
=======
lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error)
cancelMigration(ctx context.Context, sm *StreamMigrator) error
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
stopSourceWrites(ctx context.Context) error
waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
Expand Down
Loading

0 comments on commit 3266f98

Please sign in to comment.