From e14621339525d2d3ab59ff4ae8cda02a4d9dd714 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Thu, 6 Feb 2025 15:03:03 -0500 Subject: [PATCH] Adds `prefect run [deployment]` Closes ENG-1293 --- src/prefect_cloud/cli.py | 26 +++- src/prefect_cloud/client.py | 20 +-- src/prefect_cloud/deployments.py | 7 + tests/test_run.py | 132 ++++++++++++++++++ ...loyments_schedule.py => test_schedules.py} | 7 +- 5 files changed, 167 insertions(+), 25 deletions(-) create mode 100644 tests/test_run.py rename tests/{test_deployments_schedule.py => test_schedules.py} (98%) diff --git a/src/prefect_cloud/cli.py b/src/prefect_cloud/cli.py index 2a54869..dc81759 100644 --- a/src/prefect_cloud/cli.py +++ b/src/prefect_cloud/cli.py @@ -16,10 +16,7 @@ from rich.text import Text from prefect_cloud import auth, deployments -from prefect_cloud.client import ( - get_cloud_api_url, - get_prefect_cloud_client, -) +from prefect_cloud.client import get_prefect_cloud_client from prefect_cloud.dependencies import get_dependencies from prefect_cloud.github import ( FileNotFound, @@ -134,6 +131,8 @@ async def deploy( # TODO: put back flowify if this a public repo? need to figure that out. ] + _, api_url, _ = auth.get_cloud_urls_or_login() + deployment_id = await client.create_managed_deployment( deployment_name, github_ref.filepath, @@ -143,7 +142,7 @@ async def deploy( parameter_schema, job_variables={ "pip_packages": get_dependencies(dependencies or []), - "env": {"PREFECT_CLOUD_API_URL": get_cloud_api_url()} | env_vars, + "env": {"PREFECT_CLOUD_API_URL": api_url} | env_vars, }, ) @@ -161,6 +160,23 @@ async def deploy( ) +@app.command() +async def run( + deployment: str = typer.Argument( + ..., + help="The deployment to run (either its name or ID).", + ), +): + ui_url, _, _ = auth.get_cloud_urls_or_login() + flow_run = await deployments.run(deployment) + flow_run_url = f"{ui_url}/runs/flow-run/{flow_run.id}" + app.console.print( + f"Flow run [bold]{flow_run.name}[/bold] [dim]({flow_run.id})[/dim] created", + f"and will begin running soon.\nView its progress " + f"[link={flow_run_url}]on Prefect Cloud[/link].", + ) + + @app.command() async def ls(): context = await deployments.list() diff --git a/src/prefect_cloud/client.py b/src/prefect_cloud/client.py index df06f97..570148f 100644 --- a/src/prefect_cloud/client.py +++ b/src/prefect_cloud/client.py @@ -1,3 +1,4 @@ +from logging import getLogger from typing import Any from uuid import UUID @@ -7,11 +8,11 @@ BlockDocumentUpdate, WorkPoolCreate, ) -from prefect.client.schemas.filters import WorkPoolFilter, WorkPoolFilterType -from prefect.exceptions import ObjectNotFound -from prefect.settings import ( - PREFECT_API_URL, +from prefect.client.schemas.filters import ( + WorkPoolFilter, + WorkPoolFilterType, ) +from prefect.exceptions import ObjectNotFound from prefect.utilities.callables import ParameterSchema from prefect.workers.utilities import ( get_default_base_job_template_for_infrastructure_type, @@ -22,16 +23,7 @@ PREFECT_MANAGED = "prefect:managed" - -# TODO: temporary remove -def get_cloud_api_url(): - url = PREFECT_API_URL.value() - if url.startswith("https://api.prefect.dev/api"): - return "https://api.prefect.dev/api" - elif url.startswith("https://api.stg.prefect.dev/api"): - return "https://api.stg.prefect.dev/api" - else: - return "https://api.prefect.cloud/api" +logger = getLogger(__name__) class PrefectCloudClient(PrefectClient): diff --git a/src/prefect_cloud/deployments.py b/src/prefect_cloud/deployments.py index a48d300..f872ab3 100644 --- a/src/prefect_cloud/deployments.py +++ b/src/prefect_cloud/deployments.py @@ -64,6 +64,13 @@ async def _get_deployment(deployment_: str) -> DeploymentResponse: return await client.read_deployment(deployment_id) +async def run(deployment_: str) -> FlowRun: + deployment = await _get_deployment(deployment_) + + async with get_prefect_cloud_client() as client: + return await client.create_flow_run_from_deployment(deployment.id) + + async def schedule(deployment_: str, schedule: str): deployment = await _get_deployment(deployment_) diff --git a/tests/test_run.py b/tests/test_run.py new file mode 100644 index 0000000..6fb77b7 --- /dev/null +++ b/tests/test_run.py @@ -0,0 +1,132 @@ +from uuid import UUID, uuid4 + +import prefect.main # noqa: F401 +import pytest +import respx +from httpx import Response +from prefect.client.schemas.objects import FlowRun +from prefect.client.schemas.responses import DeploymentResponse +from prefect.exceptions import ObjectNotFound + +from prefect_cloud import deployments + + +@pytest.fixture +def account() -> UUID: + return UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") + + +@pytest.fixture +def workspace() -> UUID: + return UUID("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb") + + +@pytest.fixture +def api_url(account: UUID, workspace: UUID) -> str: + return f"https://api.prefect.cloud/api/accounts/{account}/workspaces/{workspace}" + + +@pytest.fixture(autouse=True) +def mock_get_cloud_urls_or_login( + monkeypatch: pytest.MonkeyPatch, account: UUID, workspace: UUID, api_url: str +): + def mock_urls(): + return ( + f"https://app.prefect.cloud/account/{account}/workspace/{workspace}", + api_url, + "test_api_key", + ) + + monkeypatch.setattr("prefect_cloud.auth.get_cloud_urls_or_login", mock_urls) + + +@pytest.fixture +def mock_deployment() -> DeploymentResponse: + return DeploymentResponse( + id=uuid4(), + flow_id=uuid4(), + name="test-deployment", + schedules=[], + is_schedule_active=True, + created=None, + updated=None, + ) + + +@pytest.fixture +def mock_flow_run() -> FlowRun: + return FlowRun( + id=uuid4(), + deployment_id=uuid4(), + flow_id=uuid4(), + name="test-run", + state_type="SCHEDULED", + expected_start_time=None, + created=None, + updated=None, + ) + + +async def test_run_deployment_by_id( + cloud_api: respx.Router, + mock_deployment: DeploymentResponse, + mock_flow_run: FlowRun, + api_url: str, +): + """run() should create a flow run when given a deployment ID""" + mock_flow_run.deployment_id = mock_deployment.id + + cloud_api.get(f"{api_url}/deployments/{mock_deployment.id}").mock( + return_value=Response(200, json=mock_deployment.model_dump(mode="json")) + ) + cloud_api.post(f"{api_url}/deployments/{mock_deployment.id}/flow_runs").mock( + return_value=Response(201, json=mock_flow_run.model_dump(mode="json")) + ) + cloud_api.post(f"{api_url}/deployments/{mock_deployment.id}/create_flow_run").mock( + return_value=Response(201, json=mock_flow_run.model_dump(mode="json")) + ) + + result = await deployments.run(str(mock_deployment.id)) + + assert result.id == mock_flow_run.id + assert result.deployment_id == mock_deployment.id + + +async def test_run_deployment_by_name( + cloud_api: respx.Router, + mock_deployment: DeploymentResponse, + mock_flow_run: FlowRun, + api_url: str, +): + """run() should create a flow run when given a deployment name""" + mock_flow_run.deployment_id = mock_deployment.id + deployment_name = "my-flow/my-deployment" + + cloud_api.get(f"{api_url}/deployments/name/{deployment_name}").mock( + return_value=Response(200, json=mock_deployment.model_dump(mode="json")) + ) + cloud_api.post(f"{api_url}/deployments/{mock_deployment.id}/flow_runs").mock( + return_value=Response(201, json=mock_flow_run.model_dump(mode="json")) + ) + cloud_api.post(f"{api_url}/deployments/{mock_deployment.id}/create_flow_run").mock( + return_value=Response(201, json=mock_flow_run.model_dump(mode="json")) + ) + + result = await deployments.run(deployment_name) + + assert result.id == mock_flow_run.id + assert result.deployment_id == mock_deployment.id + + +async def test_run_deployment_not_found( + cloud_api: respx.Router, + api_url: str, +): + """run() should raise an appropriate error when the deployment is not found""" + deployment_id = uuid4() + cloud_api.get(f"{api_url}/deployments/{deployment_id}").mock( + return_value=Response(404, json={"detail": "Deployment not found"}) + ) + + with pytest.raises(ObjectNotFound): + await deployments.run(str(deployment_id)) diff --git a/tests/test_deployments_schedule.py b/tests/test_schedules.py similarity index 98% rename from tests/test_deployments_schedule.py rename to tests/test_schedules.py index f05cb8f..2e64052 100644 --- a/tests/test_deployments_schedule.py +++ b/tests/test_schedules.py @@ -1,6 +1,7 @@ from datetime import datetime, timezone from uuid import UUID, uuid4 +import prefect.main # noqa: F401 import pytest import respx from httpx import Response @@ -10,12 +11,6 @@ from prefect_cloud import deployments -# isort: split - -from prefect.results import ResultRecordMetadata # noqa: F401 - -FlowRun.model_rebuild() - @pytest.fixture def account() -> UUID: