Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds deletecommand and remove flag. #48

Merged
merged 3 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions src/prefect_cloud/cli/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async def run(
)


@app.command(rich_help_panel="Manage Deployments")
@app.command(rich_help_panel="Deploy")
async def schedule(
deployment: str = typer.Argument(
...,
Expand Down Expand Up @@ -292,7 +292,20 @@ async def schedule(
await deployments.schedule(deployment, schedule, func_kwargs)


@app.command(rich_help_panel="Manage Deployments")
@app.command(rich_help_panel="Deploy")
async def unschedule(
deployment: str = typer.Argument(
...,
help="Name or ID of the deployment to remove schedules from",
),
):
"""
Remove deployment schedules
"""
await deployments.schedule(deployment, "none")


@app.command(rich_help_panel="Deploy")
async def ls():
"""
List all deployments
Expand Down Expand Up @@ -349,32 +362,18 @@ def describe_schedule(schedule: DeploymentSchedule) -> Text:
)


@app.command(rich_help_panel="Manage Deployments")
async def pause(
deployment: str = typer.Argument(
...,
help="Name or ID of the deployment to pause",
autocompletion=completions.complete_deployment,
),
):
"""
Pause a scheduled deployment
"""
await deployments.pause(deployment)


@app.command(rich_help_panel="Manage Deployments")
async def resume(
@app.command(rich_help_panel="Deploy")
async def delete(
deployment: str = typer.Argument(
...,
help="Name or ID of the deployment to resume",
help="Name or ID of the deployment to delete",
autocompletion=completions.complete_deployment,
),
):
"""
Resume a paused deployment
Delete a deployment
"""
await deployments.resume(deployment)
await deployments.delete(deployment)


@app.command(rich_help_panel="Auth")
Expand Down
25 changes: 9 additions & 16 deletions src/prefect_cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ async def create_deployment(

return UUID(deployment_id)

async def delete_deployment(self, deployment_id: "UUID"):
try:
await self.request("DELETE", f"/deployments/{deployment_id}")
except HTTPStatusError as e:
if e.response.status_code == 404:
raise ObjectNotFound(http_exc=e) from e
else:
raise

async def create_block_document(
self,
block_document: "BlockDocument | BlockDocumentCreate",
Expand Down Expand Up @@ -800,22 +809,6 @@ async def create_credentials_secret(self, name: str, credentials: str):
)
)

async def pause_deployment(self, deployment_id: UUID):
deployment = await self.read_deployment(deployment_id)

for schedule in deployment.schedules:
await self.update_deployment_schedule_active(
deployment.id, schedule.id, active=False
)

async def resume_deployment(self, deployment_id: UUID):
deployment = await self.read_deployment(deployment_id)

for schedule in deployment.schedules:
await self.update_deployment_schedule_active(
deployment.id, schedule.id, active=True
)

async def get_default_base_job_template_for_managed_work_pool(
self,
) -> Optional[Dict[str, Any]]:
Expand Down
21 changes: 7 additions & 14 deletions src/prefect_cloud/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ async def _get_deployment(deployment_: str) -> DeploymentResponse:
return await client.read_deployment(deployment_id)


async def delete(deployment_: str):
deployment = await _get_deployment(deployment_)

async with await get_prefect_cloud_client() as client:
await client.delete_deployment(deployment.id)


async def run(deployment_: str) -> DeploymentFlowRun:
deployment = await _get_deployment(deployment_)

Expand Down Expand Up @@ -74,17 +81,3 @@ async def schedule(
await client.create_deployment_schedule(
deployment.id, new_schedule, True, parameters
)


async def pause(deployment_: str):
deployment = await _get_deployment(deployment_)

async with await get_prefect_cloud_client() as client:
await client.pause_deployment(deployment.id)


async def resume(deployment_: str):
deployment = await _get_deployment(deployment_)

async with await get_prefect_cloud_client() as client:
await client.resume_deployment(deployment.id)
26 changes: 0 additions & 26 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,32 +197,6 @@ async def test_create_deployment_schedule(
assert result.schedule == schedule


async def test_pause_deployment(
client: PrefectCloudClient,
mock_deployment: DeploymentResponse,
respx_mock: respx.Router,
):
schedule = DeploymentSchedule(
id=uuid4(),
deployment_id=mock_deployment.id,
schedule=CronSchedule(cron="0 0 * * *", timezone="UTC"),
active=True,
)
mock_deployment.schedules = [schedule]

respx_mock.get(f"{PREFECT_API_URL}/deployments/{mock_deployment.id}").mock(
return_value=Response(200, json=mock_deployment.model_dump(mode="json"))
)
patch_route = respx_mock.patch(
f"{PREFECT_API_URL}/deployments/{mock_deployment.id}/schedules/{schedule.id}"
).mock(return_value=Response(204))

await client.pause_deployment(mock_deployment.id)

assert patch_route.called
assert patch_route.calls.last.request.content == b'{"active":false}'


async def test_get_default_base_job_template_for_managed_work_pool(
client: PrefectCloudClient,
respx_mock: respx.Router,
Expand Down
104 changes: 60 additions & 44 deletions tests/test_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from httpx import Response

from prefect_cloud import deployments
from prefect_cloud.client import ObjectNotFound
from prefect_cloud.schemas.objects import (
CronSchedule,
DeploymentFlowRun,
Expand Down Expand Up @@ -215,50 +216,6 @@ async def test_schedule_none_removes_all_schedules(
assert len(cloud_api.calls) == 2 # Only get and delete, no create


async def test_pause_deployment(
cloud_api: respx.Router,
mock_deployment_with_schedule: DeploymentResponse,
api_url: str,
):
cloud_api.get(f"{api_url}/deployments/{mock_deployment_with_schedule.id}").mock(
return_value=Response(
200, json=mock_deployment_with_schedule.model_dump(mode="json")
)
)
patch_route = cloud_api.patch(
f"{api_url}"
f"/deployments/{mock_deployment_with_schedule.id}"
f"/schedules/{mock_deployment_with_schedule.schedules[0].id}"
).mock(return_value=Response(204))

await deployments.pause(str(mock_deployment_with_schedule.id))

assert patch_route.called
assert patch_route.calls.last.request.content == b'{"active":false}'


async def test_resume_deployment(
cloud_api: respx.Router,
mock_deployment_with_schedule: DeploymentResponse,
api_url: str,
):
cloud_api.get(f"{api_url}/deployments/{mock_deployment_with_schedule.id}").mock(
return_value=Response(
200, json=mock_deployment_with_schedule.model_dump(mode="json")
)
)
patch_route = cloud_api.patch(
f"{api_url}"
f"/deployments/{mock_deployment_with_schedule.id}"
f"/schedules/{mock_deployment_with_schedule.schedules[0].id}"
).mock(return_value=Response(204))

await deployments.resume(str(mock_deployment_with_schedule.id))

assert patch_route.called
assert patch_route.calls.last.request.content == b'{"active":true}'


async def test_list_returns_empty_context_when_no_deployments(
cloud_api: respx.Router, api_url: str
):
Expand Down Expand Up @@ -350,3 +307,62 @@ async def test_schedule_accepts_parameters(
assert schedule_route.called
request_body = schedule_route.calls.last.request.read().decode()
assert '"parameters":{"key":"value"}' in request_body


async def test_delete_deployment(
cloud_api: respx.Router, mock_deployment: DeploymentResponse, api_url: str
):
"""Test that a deployment can be deleted"""
# Mock the GET request to verify deployment exists
cloud_api.get(f"{api_url}/deployments/{mock_deployment.id}").mock(
return_value=Response(200, json=mock_deployment.model_dump(mode="json"))
)

# Mock the DELETE request
delete_route = cloud_api.delete(f"{api_url}/deployments/{mock_deployment.id}").mock(
return_value=Response(204)
)

await deployments.delete(str(mock_deployment.id))

assert delete_route.called
assert (
delete_route.calls.last.request.url
== f"{api_url}/deployments/{mock_deployment.id}"
)


async def test_delete_deployment_by_name(
cloud_api: respx.Router, mock_deployment: DeploymentResponse, api_url: str
):
"""Test that a deployment can be deleted using flow_name/deployment_name format"""
# Mock the GET request for name lookup
cloud_api.get(f"{api_url}/deployments/name/my-flow/my-deployment").mock(
return_value=Response(200, json=mock_deployment.model_dump(mode="json"))
)

# Mock the DELETE request
delete_route = cloud_api.delete(f"{api_url}/deployments/{mock_deployment.id}").mock(
return_value=Response(204)
)

await deployments.delete("my-flow/my-deployment")

assert delete_route.called
assert (
delete_route.calls.last.request.url
== f"{api_url}/deployments/{mock_deployment.id}"
)


async def test_delete_nonexistent_deployment(cloud_api: respx.Router, api_url: str):
"""Test that deleting a nonexistent deployment raises ObjectNotFound"""
deployment_id = "11111111-1111-1111-1111-111111111111"

# Mock 404 response for nonexistent deployment
cloud_api.get(f"{api_url}/deployments/{deployment_id}").mock(
return_value=Response(404, json={"detail": "Deployment not found"})
)

with pytest.raises(ObjectNotFound):
await deployments.delete(deployment_id)