Skip to content

Commit

Permalink
Adds prefect-cloud run [deployment] (#12)
Browse files Browse the repository at this point in the history
* Adds `prefect run [deployment]`

Closes ENG-1293

* Updates from editor
  • Loading branch information
chrisguidry authored Feb 6, 2025
1 parent 8d5d62e commit aa4e043
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 25 deletions.
26 changes: 21 additions & 5 deletions src/prefect_cloud/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
from rich.text import Text

from prefect_cloud import auth, deployments
from prefect_cloud.client import (
get_cloud_api_url,
get_prefect_cloud_client,
)
from prefect_cloud.client import get_prefect_cloud_client
from prefect_cloud.dependencies import get_dependencies
from prefect_cloud.github import (
FileNotFound,
Expand Down Expand Up @@ -134,6 +131,8 @@ async def deploy(
# TODO: put back flowify if this a public repo? need to figure that out.
]

_, api_url, _ = auth.get_cloud_urls_or_login()

deployment_id = await client.create_managed_deployment(
deployment_name,
github_ref.filepath,
Expand All @@ -143,7 +142,7 @@ async def deploy(
parameter_schema,
job_variables={
"pip_packages": get_dependencies(dependencies or []),
"env": {"PREFECT_CLOUD_API_URL": get_cloud_api_url()} | env_vars,
"env": {"PREFECT_CLOUD_API_URL": api_url} | env_vars,
},
)

Expand All @@ -161,6 +160,23 @@ async def deploy(
)


@app.command()
async def run(
deployment: str = typer.Argument(
...,
help="The deployment to run (either its name or ID).",
),
):
ui_url, _, _ = auth.get_cloud_urls_or_login()
flow_run = await deployments.run(deployment)
flow_run_url = f"{ui_url}/runs/flow-run/{flow_run.id}"
app.console.print(
f"Flow run [bold]{flow_run.name}[/bold] [dim]({flow_run.id})[/dim] created",
f"and will begin running soon.\n"
f"[link={flow_run_url}]View its progress on Prefect Cloud[/link].",
)


@app.command()
async def ls():
context = await deployments.list()
Expand Down
20 changes: 6 additions & 14 deletions src/prefect_cloud/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from logging import getLogger
from typing import Any
from uuid import UUID

Expand All @@ -7,11 +8,11 @@
BlockDocumentUpdate,
WorkPoolCreate,
)
from prefect.client.schemas.filters import WorkPoolFilter, WorkPoolFilterType
from prefect.exceptions import ObjectNotFound
from prefect.settings import (
PREFECT_API_URL,
from prefect.client.schemas.filters import (
WorkPoolFilter,
WorkPoolFilterType,
)
from prefect.exceptions import ObjectNotFound
from prefect.utilities.callables import ParameterSchema
from prefect.workers.utilities import (
get_default_base_job_template_for_infrastructure_type,
Expand All @@ -22,16 +23,7 @@

PREFECT_MANAGED = "prefect:managed"


# TODO: temporary remove
def get_cloud_api_url():
url = PREFECT_API_URL.value()
if url.startswith("https://api.prefect.dev/api"):
return "https://api.prefect.dev/api"
elif url.startswith("https://api.stg.prefect.dev/api"):
return "https://api.stg.prefect.dev/api"
else:
return "https://api.prefect.cloud/api"
logger = getLogger(__name__)


class PrefectCloudClient(PrefectClient):
Expand Down
7 changes: 7 additions & 0 deletions src/prefect_cloud/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ async def _get_deployment(deployment_: str) -> DeploymentResponse:
return await client.read_deployment(deployment_id)


async def run(deployment_: str) -> FlowRun:
deployment = await _get_deployment(deployment_)

async with get_prefect_cloud_client() as client:
return await client.create_flow_run_from_deployment(deployment.id)


async def schedule(deployment_: str, schedule: str):
deployment = await _get_deployment(deployment_)

Expand Down
132 changes: 132 additions & 0 deletions tests/test_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from uuid import UUID, uuid4

import prefect.main # noqa: F401
import pytest
import respx
from httpx import Response
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.responses import DeploymentResponse
from prefect.exceptions import ObjectNotFound

from prefect_cloud import deployments


@pytest.fixture
def account() -> UUID:
return UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")


@pytest.fixture
def workspace() -> UUID:
return UUID("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb")


@pytest.fixture
def api_url(account: UUID, workspace: UUID) -> str:
return f"https://api.prefect.cloud/api/accounts/{account}/workspaces/{workspace}"


@pytest.fixture(autouse=True)
def mock_get_cloud_urls_or_login(
monkeypatch: pytest.MonkeyPatch, account: UUID, workspace: UUID, api_url: str
):
def mock_urls():
return (
f"https://app.prefect.cloud/account/{account}/workspace/{workspace}",
api_url,
"test_api_key",
)

monkeypatch.setattr("prefect_cloud.auth.get_cloud_urls_or_login", mock_urls)


@pytest.fixture
def mock_deployment() -> DeploymentResponse:
return DeploymentResponse(
id=uuid4(),
flow_id=uuid4(),
name="test-deployment",
schedules=[],
is_schedule_active=True,
created=None,
updated=None,
)


@pytest.fixture
def mock_flow_run() -> FlowRun:
return FlowRun(
id=uuid4(),
deployment_id=uuid4(),
flow_id=uuid4(),
name="test-run",
state_type="SCHEDULED",
expected_start_time=None,
created=None,
updated=None,
)


async def test_run_deployment_by_id(
cloud_api: respx.Router,
mock_deployment: DeploymentResponse,
mock_flow_run: FlowRun,
api_url: str,
):
"""run() should create a flow run when given a deployment ID"""
mock_flow_run.deployment_id = mock_deployment.id

cloud_api.get(f"{api_url}/deployments/{mock_deployment.id}").mock(
return_value=Response(200, json=mock_deployment.model_dump(mode="json"))
)
cloud_api.post(f"{api_url}/deployments/{mock_deployment.id}/flow_runs").mock(
return_value=Response(201, json=mock_flow_run.model_dump(mode="json"))
)
cloud_api.post(f"{api_url}/deployments/{mock_deployment.id}/create_flow_run").mock(
return_value=Response(201, json=mock_flow_run.model_dump(mode="json"))
)

result = await deployments.run(str(mock_deployment.id))

assert result.id == mock_flow_run.id
assert result.deployment_id == mock_deployment.id


async def test_run_deployment_by_name(
cloud_api: respx.Router,
mock_deployment: DeploymentResponse,
mock_flow_run: FlowRun,
api_url: str,
):
"""run() should create a flow run when given a deployment name"""
mock_flow_run.deployment_id = mock_deployment.id
deployment_name = "my-flow/my-deployment"

cloud_api.get(f"{api_url}/deployments/name/{deployment_name}").mock(
return_value=Response(200, json=mock_deployment.model_dump(mode="json"))
)
cloud_api.post(f"{api_url}/deployments/{mock_deployment.id}/flow_runs").mock(
return_value=Response(201, json=mock_flow_run.model_dump(mode="json"))
)
cloud_api.post(f"{api_url}/deployments/{mock_deployment.id}/create_flow_run").mock(
return_value=Response(201, json=mock_flow_run.model_dump(mode="json"))
)

result = await deployments.run(deployment_name)

assert result.id == mock_flow_run.id
assert result.deployment_id == mock_deployment.id


async def test_run_deployment_not_found(
cloud_api: respx.Router,
api_url: str,
):
"""run() should raise an appropriate error when the deployment is not found"""
deployment_id = uuid4()
cloud_api.get(f"{api_url}/deployments/{deployment_id}").mock(
return_value=Response(404, json={"detail": "Deployment not found"})
)

with pytest.raises(ObjectNotFound):
await deployments.run(str(deployment_id))
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timezone
from uuid import UUID, uuid4

import prefect.main # noqa: F401
import pytest
import respx
from httpx import Response
Expand All @@ -10,12 +11,6 @@

from prefect_cloud import deployments

# isort: split

from prefect.results import ResultRecordMetadata # noqa: F401

FlowRun.model_rebuild()


@pytest.fixture
def account() -> UUID:
Expand Down

0 comments on commit aa4e043

Please sign in to comment.