diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index ad2009da2a8..42f6a114b13 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2915,8 +2915,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to migrate the workflow streams", err) } if cancel { - sw.cancelMigration(ctx, sm) - return 0, sw.logs(), nil + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } + return 0, sw.logs(), err } // We stop writes on the source before stopping the source streams so that the catchup time @@ -2928,7 +2930,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // we actually stop them. ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } @@ -2946,7 +2950,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) } } - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr)) + } return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } @@ -2956,7 +2962,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // the tablet's deny list check and the first mysqld side table lock. for cnt := 1; cnt <= lockTablesCycles; cnt++ { if err := ts.executeLockTablesOnSource(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr)) + } return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err) } // No need to UNLOCK the tables as the connection was closed once the locks were acquired @@ -2977,7 +2985,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, waitTimeout); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to sync up replication between the source and target", err) } @@ -2986,7 +2996,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to migrate the workflow streams", err) } @@ -2995,7 +3007,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to reset the sequences", err) } @@ -3004,7 +3018,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to create the reverse vreplication streams", err) } @@ -3019,7 +3035,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit initSeqCtx, cancel := context.WithTimeout(ctx, waitTimeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) } } diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index 789822b5be9..03d1a413e98 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -124,8 +124,8 @@ func (r *switcher) stopStreams(ctx context.Context, sm *StreamMigrator) ([]strin return sm.StopStreams(ctx) } -func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { - r.ts.cancelMigration(ctx, sm) +func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error { + return r.ts.cancelMigration(ctx, sm) } func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index 29c40f85a69..c7f66b93e14 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -301,8 +301,9 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) ( return nil, nil } -func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) { +func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) error { dr.drLog.Log("Cancel migration as requested") + return nil } func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ ...topo.LockOption) (context.Context, func(*error), error) { diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 560b7a695fd..e780add1a2c 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -27,7 +27,7 @@ import ( type iswitcher interface { lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) - cancelMigration(ctx context.Context, sm *StreamMigrator) + cancelMigration(ctx context.Context, sm *StreamMigrator) error stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 8b035d9c6c5..2a5e982270e 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1137,8 +1137,9 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { // cancelMigration attempts to revert all changes made during the migration so that we can get back to the // state when traffic switching (or reversing) was initiated. -func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { +func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error { var err error + cancelErrs := &concurrency.AllErrorRecorder{} if ctx.Err() != nil { // Even though we create a new context later on we still record any context error: @@ -1162,6 +1163,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { + cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) } @@ -1174,13 +1176,20 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat return err }) if err != nil { + cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } err = ts.deleteReverseVReplication(cmCtx) if err != nil { + cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) } + + if cancelErrs.HasErrors() { + return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary") + } + return nil } func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error {