Skip to content

Commit

Permalink
feat(engine): add reverse transfer and payout
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Dec 16, 2024
1 parent 49f5e6d commit 69b50a9
Showing 1 changed file with 123 additions and 0 deletions.
123 changes: 123 additions & 0 deletions internal/connectors/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ type Engine interface {
ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID, waitResult bool) (models.Task, error)
// Create a transfer between two accounts on the given connector (PSP).
CreateTransfer(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error)
// Reverse a transfer on the given connector (PSP).
ReverseTransfer(ctx context.Context, reversal models.PaymentInitiationReversal, waitResult bool) (models.Task, error)
// Create a payout on the given connector (PSP).
CreatePayout(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error)
// Reverse a payout on the given connector (PSP).
ReversePayout(ctx context.Context, reversal models.PaymentInitiationReversal, waitResult bool) (models.Task, error)

// We received a webhook, handle it by calling the corresponding plugin to
// translate it to a formance object and store it.
Expand Down Expand Up @@ -487,6 +491,65 @@ func (e *engine) CreateTransfer(ctx context.Context, piID models.PaymentInitiati
return task, nil
}

func (e *engine) ReverseTransfer(ctx context.Context, reversal models.PaymentInitiationReversal, waitResult bool) (models.Task, error) {
ctx, span := otel.Tracer().Start(ctx, "engine.ReverseTransfer")
defer span.End()

ctx = context.WithoutCancel(ctx)
e.wg.Add(1)
defer e.wg.Done()

id := models.TaskIDReference(fmt.Sprintf("reverse-transfer-%s-%s", e.stack, reversal.CreatedAt.String()), reversal.ConnectorID, reversal.ID.String())
now := time.Now().UTC()
task := models.Task{
ID: models.TaskID{
Reference: id,
ConnectorID: reversal.ConnectorID,
},
ConnectorID: reversal.ConnectorID,
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
}
if err := e.storage.TasksUpsert(ctx, task); err != nil {
otel.RecordError(span, err)
return models.Task{}, err
}

run, err := e.temporalClient.ExecuteWorkflow(
ctx,
client.StartWorkflowOptions{
ID: id,
TaskQueue: reversal.ConnectorID.String(),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
workflow.SearchAttributeStack: e.stack,
},
},
workflow.RunReverseTransfer,
workflow.ReverseTransfer{
TaskID: task.ID,
ConnectorID: reversal.ConnectorID,
PaymentInitiationReversalID: reversal.ID,
},
)
if err != nil {
otel.RecordError(span, err)
return models.Task{}, err
}

if waitResult {
// Wait for bank account creation to complete
if err := run.Get(ctx, nil); err != nil {
otel.RecordError(span, err)
return models.Task{}, err
}
}

return task, nil
}

func (e *engine) CreatePayout(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error) {
ctx, span := otel.Tracer().Start(ctx, "engine.CreatePayout")
defer span.End()
Expand Down Expand Up @@ -544,6 +607,66 @@ func (e *engine) CreatePayout(ctx context.Context, piID models.PaymentInitiation
return task, nil
}

func (e *engine) ReversePayout(ctx context.Context, reversal models.PaymentInitiationReversal, waitResult bool) (models.Task, error) {
ctx, span := otel.Tracer().Start(ctx, "engine.ReversePayout")
defer span.End()

ctx = context.WithoutCancel(ctx)
e.wg.Add(1)
defer e.wg.Done()

id := models.TaskIDReference(fmt.Sprintf("reverse-payout-%s-%s", e.stack, reversal.CreatedAt.String()), reversal.ConnectorID, reversal.ID.String())
now := time.Now().UTC()
task := models.Task{
ID: models.TaskID{
Reference: id,
ConnectorID: reversal.ConnectorID,
},
ConnectorID: reversal.ConnectorID,
Status: models.TASK_STATUS_PROCESSING,
CreatedAt: now,
UpdatedAt: now,
}

if err := e.storage.TasksUpsert(ctx, task); err != nil {
otel.RecordError(span, err)
return models.Task{}, err
}

run, err := e.temporalClient.ExecuteWorkflow(
ctx,
client.StartWorkflowOptions{
ID: id,
TaskQueue: reversal.ConnectorID.String(),
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
workflow.SearchAttributeStack: e.stack,
},
},
workflow.RunReversePayout,
workflow.ReversePayout{
TaskID: task.ID,
ConnectorID: reversal.ConnectorID,
PaymentInitiationReversalID: reversal.ID,
},
)
if err != nil {
otel.RecordError(span, err)
return models.Task{}, err
}

if waitResult {
// Wait for bank account creation to complete
if err := run.Get(ctx, nil); err != nil {
otel.RecordError(span, err)
return models.Task{}, err
}
}

return task, nil
}

func (e *engine) HandleWebhook(ctx context.Context, urlPath string, webhook models.Webhook) error {
ctx, span := otel.Tracer().Start(ctx, "engine.HandleWebhook")
defer span.End()
Expand Down

0 comments on commit 69b50a9

Please sign in to comment.