Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agents-api): Add temporal workflow lookup relation and queries #446

Merged
merged 4 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions agents-api/agents_api/autogen/Executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ class TaskTokenResumeExecutionRequest(BaseModel):
populate_by_name=True,
)
status: Literal["running"] = "running"
task_token: str
"""
A Task Token is a unique identifier for a specific Task Execution.
"""
input: dict[str, Any] | None = None
"""
The input to resume the execution with
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/models/entry/test_entry_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _():
user_id = uuid4()
agent_id = uuid4()
session_id = uuid4()
tool_id = uuid4()
uuid4()
user_doc_id = uuid4()
agent_doc_id = uuid4()

Expand Down
23 changes: 22 additions & 1 deletion agents-api/agents_api/models/execution/create_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastapi import HTTPException
from pycozo.client import QueryException
from pydantic import ValidationError
from temporalio.client import WorkflowHandle

from ...autogen.openapi_model import CreateExecutionRequest, Execution
from ...common.utils.cozo import cozo_process_mutate_data
Expand Down Expand Up @@ -39,6 +40,7 @@ def create_execution(
task_id: UUID,
execution_id: UUID | None = None,
data: Annotated[CreateExecutionRequest | dict, dict_like(CreateExecutionRequest)],
workflow_hande: WorkflowHandle,
) -> tuple[list[str], dict]:
execution_id = execution_id or uuid4()

Expand All @@ -53,6 +55,24 @@ def create_execution(
data["metadata"] = data.get("metadata", {})
execution_data = data

temporal_columns, temporal_values = cozo_process_mutate_data(
{
"execution_id": execution_id,
"id": workflow_hande.id,
"run_id": workflow_hande.run_id,
"first_execution_run_id": workflow_hande.first_execution_run_id,
"result_run_id": workflow_hande.result_run_id,
}
)

temporal_executions_lookup_query = f"""
?[{temporal_columns}] <- $temporal_values

:insert temporal_executions_lookup {{
{temporal_columns}
}}
"""

columns, values = cozo_process_mutate_data(
{
**execution_data,
Expand All @@ -79,7 +99,8 @@ def create_execution(
task_id=task_id,
parents=[("agents", "agent_id")],
),
temporal_executions_lookup_query,
insert_query,
]

return (queries, {"values": values})
return (queries, {"values": values, "temporal_values": temporal_values})
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from uuid import UUID

from beartype import beartype
from fastapi import HTTPException
from pycozo.client import QueryException
from pydantic import ValidationError

from ...autogen.openapi_model import Transition
from ..utils import (
cozo_query,
partialclass,
rewrap_exceptions,
verify_developer_id_query,
wrap_in_class,
)


@rewrap_exceptions(
{
QueryException: partialclass(HTTPException, status_code=400),
ValidationError: partialclass(HTTPException, status_code=400),
TypeError: partialclass(HTTPException, status_code=400),
AssertionError: partialclass(HTTPException, status_code=500),
}
)
@wrap_in_class(dict, one=True)
@cozo_query
@beartype
def get_paused_execution_token(
*,
developer_id: UUID,
execution_id: UUID,
) -> tuple[list[str], dict]:
execution_id = str(execution_id)

check_status_query = """
?[execution_id, status] :=
*executions {
execution_id,
status,
},
execution_id = to_uuid($execution_id),
status = "awaiting_input"

:assert some
"""

get_query = """
?[task_token, max(created_at)] :=
execution_id = to_uuid($execution_id),
*executions {
execution_id,
},
*transitions {
execution_id,
created_at,
task_token,
type,
},
type = "wait"

"""

get_query += filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable filter is used here but it is not defined anywhere in this file. This will cause a NameError at runtime.


queries = [
verify_developer_id_query(developer_id),
get_query,
]

return (queries, {"execution_id": execution_id})
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from uuid import UUID

from beartype import beartype
from fastapi import HTTPException
from pycozo.client import QueryException
from pydantic import ValidationError

from ..utils import (
cozo_query,
partialclass,
rewrap_exceptions,
wrap_in_class,
)


@rewrap_exceptions(
{
QueryException: partialclass(HTTPException, status_code=400),
ValidationError: partialclass(HTTPException, status_code=400),
TypeError: partialclass(HTTPException, status_code=400),
}
)
@wrap_in_class(dict, one=True)
@cozo_query
@beartype
def get_temporal_workflow_data(
*,
execution_id: UUID,
) -> tuple[str, dict]:
# Executions are allowed direct GET access if they have execution_id

query = """
input[execution_id] <- [[to_uuid($execution_id)]]

?[id, run_id, result_run_id, first_execution_run_id] :=
input[execution_id],
*temporal_executions_lookup {
execution_id,
id,
run_id,
result_run_id,
first_execution_run_id,
}
"""

return (
query,
{
"execution_id": str(execution_id),
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

from agents_api.autogen.openapi_model import Execution, Transition

from ..agent.create_agent import create_agent
from ..task.create_task import create_task
from .create_execution import create_execution
from .create_execution_transition import create_execution_transition
from .get_execution import get_execution
Expand Down
2 changes: 0 additions & 2 deletions agents-api/agents_api/models/session/test_session_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

from agents_api.autogen.openapi_model import Session

from ..agent.create_agent import create_agent
from ..user.create_user import create_user
from .create_session import create_session
from .delete_session import delete_session
from .get_session import get_session
Expand Down
40 changes: 40 additions & 0 deletions agents-api/migrations/migrate_1722875101_add_temporal_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# /usr/bin/env python3

MIGRATION_ID = "add_temporal_mapping"
CREATED_AT = 1722875101.262791


def run(client, queries):
joiner = "}\n\n{"

query = joiner.join(queries)
query = f"{{\n{query}\n}}"
client.run(query)


create_temporal_executions_lookup = dict(
up="""
:create temporal_executions_lookup {
execution_id: Uuid,
id: String,
=>
run_id: String?,
first_execution_run_id: String?,
result_run_id: String?,
created_at: Float default now(),
}
""",
down="::remove temporal_executions_lookup",
)

queries = [
create_temporal_executions_lookup,
]


def up(client):
run(client, [q["up"] for q in queries])


def down(client):
run(client, [q["down"] for q in reversed(queries)])
Loading
Loading