Skip to content

Commit

Permalink
feat(connector): add task for uninstallation
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Dec 16, 2024
1 parent 9b4571e commit 0c9423e
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 62 deletions.
2 changes: 1 addition & 1 deletion internal/api/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Backend interface {
ConnectorsConfig(ctx context.Context, connectorID models.ConnectorID) (json.RawMessage, error)
ConnectorsList(ctx context.Context, query storage.ListConnectorsQuery) (*bunpaginate.Cursor[models.Connector], error)
ConnectorsInstall(ctx context.Context, provider string, config json.RawMessage) (models.ConnectorID, error)
ConnectorsUninstall(ctx context.Context, connectorID models.ConnectorID) error
ConnectorsUninstall(ctx context.Context, connectorID models.ConnectorID) (models.Task, error)
ConnectorsReset(ctx context.Context, connectorID models.ConnectorID) error

// Payments
Expand Down
7 changes: 4 additions & 3 deletions internal/api/backend/backend_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions internal/api/services/connectors_uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"github.com/formancehq/payments/internal/models"
)

func (s *Service) ConnectorsUninstall(ctx context.Context, connectorID models.ConnectorID) error {
func (s *Service) ConnectorsUninstall(ctx context.Context, connectorID models.ConnectorID) (models.Task, error) {
_, err := s.storage.ConnectorsGet(ctx, connectorID)
if err != nil {
return newStorageError(err, "get connector")
return models.Task{}, newStorageError(err, "get connector")
}

return handleEngineErrors(s.engine.UninstallConnector(ctx, connectorID))
task, err := s.engine.UninstallConnector(ctx, connectorID)
return task, handleEngineErrors(err)
}
2 changes: 1 addition & 1 deletion internal/api/v2/handler_connectors_uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func connectorsUninstall(backend backend.Backend) http.HandlerFunc {
return
}

if err := backend.ConnectorsUninstall(ctx, connectorID); err != nil {
if _, err := backend.ConnectorsUninstall(ctx, connectorID); err != nil {
otel.RecordError(span, err)
handleServiceErrors(w, r, err)
return
Expand Down
5 changes: 3 additions & 2 deletions internal/api/v3/handler_connectors_uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ func connectorsUninstall(backend backend.Backend) http.HandlerFunc {
return
}

if err := backend.ConnectorsUninstall(ctx, connectorID); err != nil {
task, err := backend.ConnectorsUninstall(ctx, connectorID)
if err != nil {
otel.RecordError(span, err)
api.InternalServerError(w, r, err)
return
}

api.Accepted(w, connectorID.String())
api.Accepted(w, task)
}
}
61 changes: 46 additions & 15 deletions internal/connectors/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Engine interface {
// Install a connector with the given provider and configuration.
InstallConnector(ctx context.Context, provider string, rawConfig json.RawMessage) (models.ConnectorID, error)
// Uninstall a connector with the given ID.
UninstallConnector(ctx context.Context, connectorID models.ConnectorID) error
UninstallConnector(ctx context.Context, connectorID models.ConnectorID) (models.Task, error)
// Reset a connector with the given ID, by uninstalling and reinstalling it.
ResetConnector(ctx context.Context, connectorID models.ConnectorID) error

Expand Down Expand Up @@ -185,25 +185,49 @@ func (e *engine) InstallConnector(ctx context.Context, provider string, rawConfi
return connector.ID, nil
}

func (e *engine) UninstallConnector(ctx context.Context, connectorID models.ConnectorID) error {
func (e *engine) UninstallConnector(ctx context.Context, connectorID models.ConnectorID) (models.Task, error) {
ctx, span := otel.Tracer().Start(ctx, "engine.UninstallConnector")
defer span.End()

if err := e.workers.RemoveWorker(connectorID.String()); err != nil {
otel.RecordError(span, err)
return err
return models.Task{}, err
}

if err := e.storage.ConnectorsScheduleForDeletion(ctx, connectorID); err != nil {
detachedCtx := context.WithoutCancel(ctx)
// Since we detached the context, we need to wait for the operation to finish
// even if the app is shutting down gracefully.
e.wg.Add(1)
defer e.wg.Done()

if err := e.storage.ConnectorsScheduleForDeletion(detachedCtx, connectorID); err != nil {
otel.RecordError(span, err)
return err
return models.Task{}, err
}

now := time.Now()
id := fmt.Sprintf("uninstall-%s-%s", e.stack, connectorID.String())
task := models.Task{
// Do not fill the connector ID as it will be deleted
ID: models.TaskID{
Reference: id,
ConnectorID: connectorID,
},
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
}

if err := e.storage.TasksUpsert(ctx, task); err != nil {
otel.RecordError(span, err)
return models.Task{}, err
}

// Launch the uninstallation in background
_, err := e.temporalClient.ExecuteWorkflow(
ctx,
detachedCtx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("uninstall-%s-%s", e.stack, connectorID.String()),
ID: id,
TaskQueue: e.workers.GetDefaultWorker(),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
WorkflowExecutionErrorWhenAlreadyStarted: false,
Expand All @@ -215,14 +239,21 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn
workflow.UninstallConnector{
ConnectorID: connectorID,
DefaultWorkerName: e.workers.GetDefaultWorker(),
TaskID: task.ID,
},
)
if err != nil {
task.Status = models.TASK_STATUS_FAILED
task.UpdatedAt = time.Now()
if err := e.storage.TasksUpsert(ctx, task); err != nil {
e.logger.Errorf("failed to update task status to failed: %v", err)
}

otel.RecordError(span, err)
return err
return models.Task{}, err
}

return nil
return task, nil
}

func (e *engine) ResetConnector(ctx context.Context, connectorID models.ConnectorID) error {
Expand All @@ -243,7 +274,7 @@ func (e *engine) ResetConnector(ctx context.Context, connectorID models.Connecto
e.wg.Add(1)
defer e.wg.Done()

if err := e.UninstallConnector(detachedCtx, connectorID); err != nil {
if _, err := e.UninstallConnector(detachedCtx, connectorID); err != nil {
otel.RecordError(span, err)
return err
}
Expand Down Expand Up @@ -401,7 +432,7 @@ func (e *engine) ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID
Reference: id,
ConnectorID: connectorID,
},
ConnectorID: connectorID,
ConnectorID: &connectorID,
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
Expand Down Expand Up @@ -458,7 +489,7 @@ func (e *engine) CreateTransfer(ctx context.Context, piID models.PaymentInitiati
Reference: id,
ConnectorID: piID.ConnectorID,
},
ConnectorID: piID.ConnectorID,
ConnectorID: &piID.ConnectorID,
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
Expand Down Expand Up @@ -518,7 +549,7 @@ func (e *engine) ReverseTransfer(ctx context.Context, reversal models.PaymentIni
Reference: id,
ConnectorID: reversal.ConnectorID,
},
ConnectorID: reversal.ConnectorID,
ConnectorID: &reversal.ConnectorID,
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
Expand Down Expand Up @@ -576,7 +607,7 @@ func (e *engine) CreatePayout(ctx context.Context, piID models.PaymentInitiation
Reference: id,
ConnectorID: piID.ConnectorID,
},
ConnectorID: piID.ConnectorID,
ConnectorID: &piID.ConnectorID,
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
Expand Down Expand Up @@ -636,7 +667,7 @@ func (e *engine) ReversePayout(ctx context.Context, reversal models.PaymentIniti
Reference: id,
ConnectorID: reversal.ConnectorID,
},
ConnectorID: reversal.ConnectorID,
ConnectorID: &reversal.ConnectorID,
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/engine/workflow/create_bank_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (w Workflow) runCreateBankAccount(
if errUpdateTask := w.updateTasksError(
ctx,
createBankAccount.TaskID,
createBankAccount.ConnectorID,
&createBankAccount.ConnectorID,
err,
); errUpdateTask != nil {
return errUpdateTask
Expand All @@ -35,7 +35,7 @@ func (w Workflow) runCreateBankAccount(
return w.updateTaskSuccess(
ctx,
createBankAccount.TaskID,
createBankAccount.ConnectorID,
&createBankAccount.ConnectorID,
accountID,
)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/engine/workflow/create_payout.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (w Workflow) runCreatePayout(
errUpdateTask := w.updateTasksError(
ctx,
createPayout.TaskID,
createPayout.ConnectorID,
&createPayout.ConnectorID,
err,
)
if errUpdateTask != nil {
Expand Down Expand Up @@ -98,7 +98,7 @@ func (w Workflow) createPayout(
return w.updateTaskSuccess(
ctx,
createPayout.TaskID,
createPayout.ConnectorID,
&createPayout.ConnectorID,
payment.ID.String(),
)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/engine/workflow/create_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (w Workflow) runCreateTransfer(
errUpdateTask := w.updateTasksError(
ctx,
createTransfer.TaskID,
createTransfer.ConnectorID,
&createTransfer.ConnectorID,
err,
)
if errUpdateTask != nil {
Expand Down Expand Up @@ -99,7 +99,7 @@ func (w Workflow) createTransfer(
return w.updateTaskSuccess(
ctx,
createTransfer.TaskID,
createTransfer.ConnectorID,
&createTransfer.ConnectorID,
payment.ID.String(),
)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/engine/workflow/poll_payout.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func (w Workflow) runPollPayout(
return w.updateTasksError(
ctx,
pollPayout.TaskID,
pollPayout.ConnectorID,
&pollPayout.ConnectorID,
err,
)
}
return w.updateTaskSuccess(
ctx,
pollPayout.TaskID,
pollPayout.ConnectorID,
&pollPayout.ConnectorID,
paymentID,
)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/engine/workflow/poll_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func (w Workflow) runPollTransfer(
return w.updateTasksError(
ctx,
pollTransfer.TaskID,
pollTransfer.ConnectorID,
&pollTransfer.ConnectorID,
err,
)
}
return w.updateTaskSuccess(
ctx,
pollTransfer.TaskID,
pollTransfer.ConnectorID,
&pollTransfer.ConnectorID,
paymentID,
)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/engine/workflow/reverse_payout.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (w Workflow) runReversePayout(
errUpdateTask := w.updateTasksError(
ctx,
reversePayout.TaskID,
reversePayout.ConnectorID,
&reversePayout.ConnectorID,
err,
)
if errUpdateTask != nil {
Expand All @@ -34,7 +34,7 @@ func (w Workflow) runReversePayout(
return w.updateTaskSuccess(
ctx,
reversePayout.TaskID,
reversePayout.ConnectorID,
&reversePayout.ConnectorID,
paymentID,
)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/engine/workflow/reverse_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (w Workflow) runReverseTransfer(
errUpdateTask := w.updateTasksError(
ctx,
reverseTransfer.TaskID,
reverseTransfer.ConnectorID,
&reverseTransfer.ConnectorID,
err,
)
if errUpdateTask != nil {
Expand All @@ -34,7 +34,7 @@ func (w Workflow) runReverseTransfer(
return w.updateTaskSuccess(
ctx,
reverseTransfer.TaskID,
reverseTransfer.ConnectorID,
&reverseTransfer.ConnectorID,
paymentID,
)
}
Expand Down
27 changes: 27 additions & 0 deletions internal/connectors/engine/workflow/uninstall_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,38 @@ import (
type UninstallConnector struct {
ConnectorID models.ConnectorID
DefaultWorkerName string
TaskID models.TaskID
}

func (w Workflow) runUninstallConnector(
ctx workflow.Context,
uninstallConnector UninstallConnector,
) error {
err := w.uninstallConnector(ctx, uninstallConnector)
if err != nil {
if errUpdateTask := w.updateTasksError(
ctx,
uninstallConnector.TaskID,
&uninstallConnector.ConnectorID,
err,
); errUpdateTask != nil {
return errUpdateTask
}

return err
}

return w.updateTaskSuccess(
ctx,
uninstallConnector.TaskID,
&uninstallConnector.ConnectorID,
uninstallConnector.ConnectorID.String(),
)
}

func (w Workflow) uninstallConnector(
ctx workflow.Context,
uninstallConnector UninstallConnector,
) error {
// First, terminate all schedules in order to prevent any workflows
// to be launched again.
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/engine/workflow/update_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
func (w Workflow) updateTasksError(
ctx workflow.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
connectorID *models.ConnectorID,
err error,
) error {
return activities.StorageTasksStore(
Expand All @@ -26,7 +26,7 @@ func (w Workflow) updateTasksError(
func (w Workflow) updateTaskSuccess(
ctx workflow.Context,
taskID models.TaskID,
connectorID models.ConnectorID,
connectorID *models.ConnectorID,
relatedObjectID string,
) error {
return activities.StorageTasksStore(
Expand Down
Loading

0 comments on commit 0c9423e

Please sign in to comment.