Skip to content

Commit

Permalink
🚨Add error status for workflow (#1134)
Browse files Browse the repository at this point in the history
  • Loading branch information
awtkns authored Jul 24, 2023
1 parent 80c1581 commit 39b53e5
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 25 deletions.
2 changes: 1 addition & 1 deletion next/src/components/workflow/BasicEdge.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { WorkflowEdge } from "../../types/workflow";
const edgeColors = {
running: "yellow",
success: "green",
failure: "red",
error: "red",
};

const CustomEdge = ({
Expand Down
3 changes: 2 additions & 1 deletion next/src/components/workflow/nodes/AbstractNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ const AbstractNode = (props: NodeProps) => (
"border-translucent w-[17em] rounded-md p-3 text-white shadow-xl shadow-stone-800 transition-colors duration-300",
props.selected ? "bg-zinc-800" : "bg-zinc-950 hover:bg-zinc-900",
props.status === "running" && "border border-amber-500",
props.status === "success" && "border border-green-500"
props.status === "success" && "border border-green-500",
props.status === "error" && "border border-red-500"
)}
>
{props.children}
Expand Down
4 changes: 2 additions & 2 deletions next/src/hooks/useWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { getNodeType, toReactFlowEdge, toReactFlowNode } from "../types/workflow

const StatusEventSchema = z.object({
nodeId: z.string(),
status: z.enum(["running", "success", "failure"]),
status: z.enum(["running", "success", "error"]),
remaining: z.number().optional(),
});

Expand Down Expand Up @@ -95,7 +95,7 @@ export const useWorkflow = (workflowId: string, session: Session | null) => {
updateValue(setNodes, "status", status, (n) => n?.id === nodeId);
updateValue(setEdges, "status", status, (e) => e?.target === nodeId);

if (remaining === 0) {
if (status === "error" || remaining === 0) {
setTimeout(() => {
updateValue(setNodes, "status", undefined);
updateValue(setEdges, "status", undefined);
Expand Down
2 changes: 1 addition & 1 deletion next/src/types/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const WorkflowEdgeSchema = z.object({
source: z.string(),
source_handle: z.string().optional().nullable(),
target: z.string(),
status: z.enum(["running", "success", "failure"]).optional(),
status: z.enum(["running", "success", "error"]).optional(),
});
export const WorkflowSchema = z.object({
id: z.string(),
Expand Down
22 changes: 2 additions & 20 deletions platform/reworkd_platform/services/worker/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from reworkd_platform.services.kafka.event_schemas import WorkflowTaskEvent
from reworkd_platform.services.kafka.producers.task_producer import WorkflowTaskProducer
from reworkd_platform.services.sockets import websockets
from reworkd_platform.services.worker.workflow_status import websocket_status
from reworkd_platform.web.api.workflow.blocks.web import get_block_runner


Expand All @@ -29,19 +29,11 @@ def __init__(self, producer: WorkflowTaskProducer, workflow: WorkflowTaskEvent):
async def start(self) -> None:
await self.producer.send(event=self.workflow)

@websocket_status
async def loop(self) -> None:
curr = self.workflow.queue.pop(0)
logger.info(f"Running task: {curr}")

websockets.emit(
self.workflow.workflow_id,
"workflow:node:status",
{
"nodeId": curr.id,
"status": "running",
},
)

curr.block = replace_templates(curr.block, self.workflow.outputs)

runner = get_block_runner(curr.block)
Expand All @@ -59,16 +51,6 @@ async def loop(self) -> None:
curr, (cast(IfOutput, outputs)).result
)

websockets.emit(
self.workflow.workflow_id,
"workflow:node:status",
{
"nodeId": curr.id,
"status": "success",
"remaining": len(self.workflow.queue),
},
)

if self.workflow.queue:
await self.start()
else:
Expand Down
55 changes: 55 additions & 0 deletions platform/reworkd_platform/services/worker/workflow_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from functools import wraps
from typing import TypeVar, Callable, Coroutine, Any

from reworkd_platform.services.sockets import websockets

STATUS_EVENT = "workflow:node:status"


_T = TypeVar("_T")


def websocket_status(
func: Callable[..., Coroutine[Any, Any, _T]]
) -> Callable[..., Coroutine[Any, Any, _T]]:
@wraps(func)
async def wrapper(engine: Any, *args: Any, **kwargs: Any) -> _T:
workflow_id = engine.workflow.workflow_id
node_id = engine.workflow.queue[0].id

websockets.emit(
workflow_id,
STATUS_EVENT,
{
"nodeId": node_id,
"status": "running",
},
)

try:
result = await func(engine, *args, **kwargs)
except Exception as e:
websockets.emit(
workflow_id,
STATUS_EVENT,
{
"nodeId": node_id,
"status": "error",
},
)
raise e

# Emit 'success' status at the end
websockets.emit(
workflow_id,
STATUS_EVENT,
{
"nodeId": node_id,
"status": "success",
"remaining": len(engine.workflow.queue),
},
)

return result

return wrapper

0 comments on commit 39b53e5

Please sign in to comment.