Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] Multi-tenant workflow SwitchWrites: Don't add denied tables on cancelMigration() (#17782) #17795

Merged
merged 2 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3271,7 +3271,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
time.Sleep(lockTablesCycleDelay)
}
}

ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, timeout); err != nil {
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
Expand Down
19 changes: 13 additions & 6 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,9 +1003,9 @@
if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
ts.Logger().Infof("cancelMigration (%v): original context invalid: %s", ts.WorkflowName(), ctx.Err())

Check warning on line 1006 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1006

Added line #L1006 was not covered by tests
}

ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName())

Check warning on line 1008 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1008

Added line #L1008 was not covered by tests
// We create a new context while canceling the migration, so that we are independent of the original
// context being canceled prior to or during the cancel operation itself.
// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
Expand All @@ -1017,20 +1017,23 @@
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
ts.Logger().Infof("cancelMigration (%v): allowing writes on source tables", ts.WorkflowName())

Check warning on line 1020 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1020

Added line #L1020 was not covered by tests
err = ts.changeTableSourceWrites(cmCtx, allowWrites)
} else {
ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName())

Check warning on line 1023 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1023

Added line #L1023 was not covered by tests
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not revert denied tables / shard access: %v", ts.WorkflowName(), err)

Check warning on line 1028 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1028

Added line #L1028 was not covered by tests
}

if err := sm.CancelStreamMigrations(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not cancel stream migrations: %v", ts.WorkflowName(), err)

Check warning on line 1033 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1033

Added line #L1033 was not covered by tests
}

ts.Logger().Infof("cancelMigration (%v): restarting vreplication workflows", ts.WorkflowName())

Check warning on line 1036 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1036

Added line #L1036 was not covered by tests
err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
Expand All @@ -1039,17 +1042,21 @@
})
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not restart vreplication: %v", ts.WorkflowName(), err)

Check warning on line 1045 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1045

Added line #L1045 was not covered by tests
}

ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName())

Check warning on line 1048 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1048

Added line #L1048 was not covered by tests
if err := ts.deleteReverseVReplication(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not delete reverse vreplication streams: %v", ts.WorkflowName(), err)

Check warning on line 1051 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1051

Added line #L1051 was not covered by tests
}

if cancelErrs.HasErrors() {
ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate))

Check warning on line 1055 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1055

Added line #L1055 was not covered by tests
return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
}

ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName())

Check warning on line 1059 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1059

Added line #L1059 was not covered by tests
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
source: &binlogdatapb.BinlogSource{},
}
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

id, err := strconv.ParseInt(params["id"], 10, 32)
if err != nil {
return nil, err
}
ct.id = int32(id)
ct.workflow = params["workflow"]
log.Infof("creating controller with id: %v, name: %v, cell: %v, tabletTypes: %v", ct.id, ct.workflow, cell, tabletTypesStr)

ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError)

state := params["state"]
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
settings.StopPos = pausePos
saveStop = false
}

queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
}
Expand Down Expand Up @@ -251,7 +250,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error {
// one. This allows for the apply thread to catch up more quickly if
// a backlog builds up.
func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source)
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vp.vr.id, vp.vr.WorkflowName, vp.startPos, vp.stopPos)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
Loading