Skip to content

Commit

Permalink
fix: (uninstall) check if connector has been uninstalled before sched…
Browse files Browse the repository at this point in the history
…uling more workflows
  • Loading branch information
laouji committed Jan 9, 2025
1 parent 8d2d106 commit 4418e36
Showing 1 changed file with 108 additions and 64 deletions.
172 changes: 108 additions & 64 deletions internal/connectors/engine/workflow/plugin_workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package workflow

import (
"encoding/json"
"fmt"

"github.com/formancehq/payments/internal/connectors/engine/activities"
Expand Down Expand Up @@ -99,80 +100,123 @@ func (w Workflow) run(
return fmt.Errorf("unknown task type: %v", task.TaskType)
}

if task.Periodically {
// Schedule next workflow every polling duration
// TODO(polo): context
var scheduleID string
if fromPayload == nil {
scheduleID = fmt.Sprintf("%s-%s-%s", w.stack, connectorID.String(), capability.String())
} else {
scheduleID = fmt.Sprintf("%s-%s-%s-%s", w.stack, connectorID.String(), capability.String(), fromPayload.ID)
}
connector, err := activities.StorageConnectorsGet(infiniteRetryContext(ctx), connectorID)
if err != nil {
return err
}

err := activities.StorageSchedulesStore(
infiniteRetryContext(ctx),
models.Schedule{
ID: scheduleID,
ConnectorID: connectorID,
CreatedAt: workflow.Now(ctx).UTC(),
})
if err != nil {
return err
}
// avoid scheduling next workflow if connector has been flagged for deletion
if connector.ScheduledForDeletion {
return nil
}

err = activities.TemporalScheduleCreate(
infiniteRetryContext(ctx),
activities.ScheduleCreateOptions{
ScheduleID: scheduleID,
Jitter: config.PollingPeriod / 2,
Interval: client.ScheduleIntervalSpec{
Every: config.PollingPeriod,
},
Action: client.ScheduleWorkflowAction{
// Use the same ID as the schedule ID, so we can identify the workflows running.
// This is useful for debugging purposes.
ID: scheduleID,
Workflow: nextWorkflow,
Args: []interface{}{
request,
task.NextTasks,
},
TaskQueue: w.getDefaultTaskQueue(),
},
Overlap: enums.SCHEDULE_OVERLAP_POLICY_SKIP,
TriggerImmediately: true,
SearchAttributes: map[string]any{
SearchAttributeScheduleID: scheduleID,
SearchAttributeStack: w.stack,
},
},
// Schedule next workflow every polling duration
if task.Periodically {
// TODO(polo): context
err := w.scheduleNextWorkflow(
ctx,
connector,
capability,
task,
fromPayload,
nextWorkflow,
request,
)
if err != nil {
return err
return fmt.Errorf("failed to schedule periodic task: %w", err)
}
return nil
}

} else {
// Run next workflow immediately
if err := workflow.ExecuteChildWorkflow(
workflow.WithChildOptions(
ctx,
workflow.ChildWorkflowOptions{
TaskQueue: w.getDefaultTaskQueue(),
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
SearchAttributes: map[string]interface{}{
SearchAttributeStack: w.stack,
},
// Run next workflow immediately
if err := workflow.ExecuteChildWorkflow(
workflow.WithChildOptions(
ctx,
workflow.ChildWorkflowOptions{
TaskQueue: w.getDefaultTaskQueue(),
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
SearchAttributes: map[string]interface{}{
SearchAttributeStack: w.stack,

Check warning on line 139 in internal/connectors/engine/workflow/plugin_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/workflow/plugin_workflow.go#L132-L139

Added lines #L132 - L139 were not covered by tests
},
),
nextWorkflow,
request,
task.NextTasks,
).GetChildWorkflowExecution().Get(ctx, nil); err != nil {
return errors.Wrap(err, "running next workflow")
}
},
),
nextWorkflow,
request,
task.NextTasks,
).GetChildWorkflowExecution().Get(ctx, nil); err != nil {
return errors.Wrap(err, "running next workflow")

Check warning on line 147 in internal/connectors/engine/workflow/plugin_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/workflow/plugin_workflow.go#L141-L147

Added lines #L141 - L147 were not covered by tests
}
}
return nil
}

func (w Workflow) scheduleNextWorkflow(
ctx workflow.Context,
connector *models.Connector,
capability models.Capability,
task models.ConnectorTaskTree,
fromPayload *FromPayload,
nextWorkflow interface{},
request interface{},
) error {
var (
config models.Config
scheduleID string
)
if fromPayload == nil {
scheduleID = fmt.Sprintf("%s-%s-%s", w.stack, connector.ID.String(), capability.String())
} else {
scheduleID = fmt.Sprintf("%s-%s-%s-%s", w.stack, connector.ID.String(), capability.String(), fromPayload.ID)
}

// use most up-to-date configuration
if err := json.Unmarshal(connector.Config, &config); err != nil {
return err
}

Check warning on line 175 in internal/connectors/engine/workflow/plugin_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/workflow/plugin_workflow.go#L174-L175

Added lines #L174 - L175 were not covered by tests

connectorID := connector.ID
err := activities.StorageSchedulesStore(
infiniteRetryContext(ctx),
models.Schedule{
ID: scheduleID,
ConnectorID: connectorID,
CreatedAt: workflow.Now(ctx).UTC(),
})
if err != nil {
return err
}

err = activities.TemporalScheduleCreate(
infiniteRetryContext(ctx),
activities.ScheduleCreateOptions{
ScheduleID: scheduleID,
Jitter: config.PollingPeriod / 2,
Interval: client.ScheduleIntervalSpec{
Every: config.PollingPeriod,
},
Action: client.ScheduleWorkflowAction{
// Use the same ID as the schedule ID, so we can identify the workflows running.
// This is useful for debugging purposes.
ID: scheduleID,
Workflow: nextWorkflow,
Args: []interface{}{
request,
task.NextTasks,
},
TaskQueue: w.getDefaultTaskQueue(),
},
Overlap: enums.SCHEDULE_OVERLAP_POLICY_SKIP,
TriggerImmediately: true,
SearchAttributes: map[string]any{
SearchAttributeScheduleID: scheduleID,
SearchAttributeStack: w.stack,
},
},
)
if err != nil {
return err
}

Check warning on line 218 in internal/connectors/engine/workflow/plugin_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/workflow/plugin_workflow.go#L217-L218

Added lines #L217 - L218 were not covered by tests
return nil
}

const Run = "Run"

0 comments on commit 4418e36

Please sign in to comment.