Skip to content

Commit

Permalink
add error handling for workers creation
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Dec 18, 2024
1 parent b614a0b commit 079e038
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 14 deletions.
45 changes: 36 additions & 9 deletions internal/connectors/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn
ctx, span := otel.Tracer().Start(ctx, "engine.UninstallConnector")
defer span.End()

defaultWorker, err := e.workers.GetDefaultWorker()
if err != nil {
return models.Task{}, err
}

if err := e.workers.RemoveWorker(connectorID.String()); err != nil {
otel.RecordError(span, err)
return models.Task{}, err
Expand Down Expand Up @@ -224,11 +229,11 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn
}

// Launch the uninstallation in background
_, err := e.temporalClient.ExecuteWorkflow(
_, err = e.temporalClient.ExecuteWorkflow(
detachedCtx,
client.StartWorkflowOptions{
ID: id,
TaskQueue: e.workers.GetDefaultWorker(),
TaskQueue: defaultWorker,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand All @@ -238,7 +243,7 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn
workflow.RunUninstallConnector,
workflow.UninstallConnector{
ConnectorID: connectorID,
DefaultWorkerName: e.workers.GetDefaultWorker(),
DefaultWorkerName: defaultWorker,
TaskID: task.ID,
},
)
Expand Down Expand Up @@ -747,17 +752,22 @@ func (e *engine) CreatePool(ctx context.Context, pool models.Pool) error {
ctx, span := otel.Tracer().Start(ctx, "engine.CreatePool")
defer span.End()

defaultWorker, err := e.workers.GetDefaultWorker()
if err != nil {
return err
}

if err := e.storage.PoolsUpsert(ctx, pool); err != nil {
otel.RecordError(span, err)
return err
}

// Do not wait for sending of events
_, err := e.temporalClient.ExecuteWorkflow(
_, err = e.temporalClient.ExecuteWorkflow(
ctx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("pools-creation-%s-%s", e.stack, pool.ID.String()),
TaskQueue: e.workers.GetDefaultWorker(),
TaskQueue: defaultWorker,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand All @@ -781,6 +791,11 @@ func (e *engine) AddAccountToPool(ctx context.Context, id uuid.UUID, accountID m
ctx, span := otel.Tracer().Start(ctx, "engine.AddAccountToPool")
defer span.End()

defaultWorker, err := e.workers.GetDefaultWorker()
if err != nil {
return err
}

if err := e.storage.PoolsAddAccount(ctx, id, accountID); err != nil {
otel.RecordError(span, err)
return err
Expand All @@ -803,7 +818,7 @@ func (e *engine) AddAccountToPool(ctx context.Context, id uuid.UUID, accountID m
detachedCtx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("pools-add-account-%s-%s-%s", e.stack, pool.ID.String(), accountID.String()),
TaskQueue: e.workers.GetDefaultWorker(),
TaskQueue: defaultWorker,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand All @@ -827,6 +842,11 @@ func (e *engine) RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accoun
ctx, span := otel.Tracer().Start(ctx, "engine.RemoveAccountFromPool")
defer span.End()

defaultWorker, err := e.workers.GetDefaultWorker()
if err != nil {
return err
}

if err := e.storage.PoolsRemoveAccount(ctx, id, accountID); err != nil {
otel.RecordError(span, err)
return err
Expand All @@ -849,7 +869,7 @@ func (e *engine) RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accoun
detachedCtx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("pools-remove-account-%s-%s-%s", e.stack, pool.ID.String(), accountID.String()),
TaskQueue: e.workers.GetDefaultWorker(),
TaskQueue: defaultWorker,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand All @@ -873,6 +893,11 @@ func (e *engine) DeletePool(ctx context.Context, poolID uuid.UUID) error {
ctx, span := otel.Tracer().Start(ctx, "engine.DeletePool")
defer span.End()

defaultWorker, err := e.workers.GetDefaultWorker()
if err != nil {
return err
}

deleted, err := e.storage.PoolsDelete(ctx, poolID)
if err != nil {
otel.RecordError(span, err)
Expand All @@ -885,7 +910,7 @@ func (e *engine) DeletePool(ctx context.Context, poolID uuid.UUID) error {
ctx,
client.StartWorkflowOptions{
ID: fmt.Sprintf("pools-deletion-%s-%s", e.stack, poolID.String()),
TaskQueue: e.workers.GetDefaultWorker(),
TaskQueue: defaultWorker,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowExecutionErrorWhenAlreadyStarted: false,
SearchAttributes: map[string]interface{}{
Expand Down Expand Up @@ -959,7 +984,9 @@ func (e *engine) OnStart(ctx context.Context) error {
// If we have at least one connector, we need to create the default worker
// to handle the possible tasks that are not related to a specific connector.
// (ex: pools, bank accounts, uninstallation etc...)
e.workers.CreateDefaultWorker()
if err := e.workers.CreateDefaultWorker(); err != nil {
return err
}
}

return nil
Expand Down
12 changes: 7 additions & 5 deletions internal/connectors/engine/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ func (w *Workers) getDefaultWorkerName() string {
return defaultWorker
}

func (w *Workers) CreateDefaultWorker() {
w.AddWorker(w.getDefaultWorkerName())
func (w *Workers) CreateDefaultWorker() error {
return w.AddWorker(w.getDefaultWorkerName())
}

// Returns the default worker name and create it if it doesn't exist yet.
func (w *Workers) GetDefaultWorker() string {
func (w *Workers) GetDefaultWorker() (string, error) {
defaultWorker := w.getDefaultWorkerName()
w.AddWorker(defaultWorker)
return defaultWorker
if err := w.AddWorker(defaultWorker); err != nil {
return "", err
}
return defaultWorker, nil
}

func NewWorkers(logger logging.Logger, stack string, temporalClient client.Client, workflows, activities []temporal.DefinitionSet, options worker.Options) *Workers {
Expand Down

0 comments on commit 079e038

Please sign in to comment.