Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move cli and add tests #29

Merged
merged 4 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ dev = [
]

[project.scripts]
prefect-cloud = "prefect_cloud:cli.app"
prefect-cloud = "prefect_cloud.cli.root:app"

[build-system]
requires = ["hatchling"]
Expand All @@ -51,6 +51,7 @@ build-backend = "hatchling.build"
[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "session"
colors = "yes"
testpaths = [
"tests",
]
Expand Down
167 changes: 9 additions & 158 deletions src/prefect_cloud/cli.py → src/prefect_cloud/cli/root.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import asyncio
import functools
import inspect
import traceback
from typing import Any, Callable, List, Optional
from uuid import UUID

import typer
import tzlocal
from click import ClickException
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
from rich.text import Text
from rich.theme import Theme

from prefect_cloud import auth, completions, deployments
from prefect_cloud import auth, deployments
from prefect_cloud.cli import completions
from prefect_cloud.cli.utilities import (
PrefectCloudTyper,
exit_with_error,
process_key_value_pairs,
)
from prefect_cloud.dependencies import get_dependencies
from prefect_cloud.github import (
FileNotFound,
Expand All @@ -28,160 +26,13 @@
IntervalSchedule,
RRuleSchedule,
)
from prefect_cloud.utilities.exception import MissingProfileError
from prefect_cloud.utilities.flows import get_parameter_schema_from_content
from prefect_cloud.utilities.tui import redacted


class PrefectCloudTyper(typer.Typer):
"""
Wraps commands created by `Typer` to support async functions and handle errors.
"""

console: Console

def __init__(
self,
*args: Any,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
self.console = Console(
highlight=False,
theme=Theme({"prompt.choices": "bold blue"}),
color_system="auto",
)

def add_typer(
self,
typer_instance: "PrefectCloudTyper",
*args: Any,
no_args_is_help: bool = True,
aliases: Optional[list[str]] = None,
**kwargs: Any,
) -> None:
"""
This will cause help to be default command for all sub apps unless specifically stated otherwise, opposite of before.
"""
if aliases:
for alias in aliases:
super().add_typer(
typer_instance,
*args,
name=alias,
no_args_is_help=no_args_is_help,
hidden=True,
**kwargs,
)

return super().add_typer(
typer_instance, *args, no_args_is_help=no_args_is_help, **kwargs
)

def command(
self,
name: Optional[str] = None,
*args: Any,
aliases: Optional[List[str]] = None,
**kwargs: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Create a new command. If aliases are provided, the same command function
will be registered with multiple names.
"""

def wrapper(original_fn: Callable[..., Any]) -> Callable[..., Any]:
# click doesn't support async functions, so we wrap them in
# asyncio.run(). This has the advantage of keeping the function in
# the main thread, which means signal handling works for e.g. the
# server and workers. However, it means that async CLI commands can
# not directly call other async CLI commands (because asyncio.run()
# can not be called nested). In that (rare) circumstance, refactor
# the CLI command so its business logic can be invoked separately
# from its entrypoint.
func = inspect.unwrap(original_fn)

if asyncio.iscoroutinefunction(func):
async_fn = original_fn

@functools.wraps(original_fn)
def sync_fn(*args: Any, **kwargs: Any) -> Any:
return asyncio.run(async_fn(*args, **kwargs))

setattr(sync_fn, "aio", async_fn)
wrapped_fn = sync_fn
else:
wrapped_fn = original_fn

wrapped_fn = with_cli_exception_handling(wrapped_fn)
# register fn with its original name
command_decorator = super(PrefectCloudTyper, self).command(
name=name, *args, **kwargs
)
original_command = command_decorator(wrapped_fn)

# register fn for each alias, e.g. @marvin_app.command(aliases=["r"])
if aliases:
for alias in aliases:
super(PrefectCloudTyper, self).command(
name=alias,
*args,
**{k: v for k, v in kwargs.items() if k != "aliases"},
)(wrapped_fn)

return original_command

return wrapper

def setup_console(self, soft_wrap: bool, prompt: bool) -> None:
self.console = Console(
highlight=False,
color_system="auto",
theme=Theme({"prompt.choices": "bold blue"}),
soft_wrap=not soft_wrap,
force_interactive=prompt,
)


app = PrefectCloudTyper()


def exit_with_error(message: str | Exception, progress: Progress = None):
if progress:
progress.stop()
app.console.print(message, style="red")
raise typer.Exit(1)


def with_cli_exception_handling(fn: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(fn)
def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return fn(*args, **kwargs)
except (typer.Exit, typer.Abort, ClickException):
raise # Do not capture click or typer exceptions
except MissingProfileError as exc:
exit_with_error(exc)
except Exception:
traceback.print_exc()
exit_with_error("An exception occurred.")

return wrapper


def process_key_value_pairs(env: list[str]) -> dict[str, str]:
invalid_pairs = []

for e in env:
if "=" not in e:
invalid_pairs.append(e)

if invalid_pairs:
raise ValueError(f"Invalid key value pairs: {invalid_pairs}")

return {k: v for k, v in [e.split("=") for e in env]}


@app.command()
async def deploy(
function: str,
Expand Down Expand Up @@ -256,8 +107,8 @@ async def deploy(
raw_contents = await get_github_raw_content(github_ref, credentials)
except FileNotFound:
exit_with_error(
"Can't access that file in Github. It either doesn't exist or is private. "
"If it's private repo retry with `--credentials`.",
"Unable to access file in Github. "
"If it's in a private repository retry with `--credentials`.",
progress=progress,
)

Expand Down
161 changes: 161 additions & 0 deletions src/prefect_cloud/cli/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import asyncio
import functools
import inspect
import traceback
from typing import Any, Callable, List, Optional

import typer
from click import ClickException
from rich.console import Console
from rich.progress import Progress
from rich.theme import Theme

from prefect_cloud.utilities.exception import MissingProfileError


def exit_with_error(message: str | Exception, progress: Progress = None):
from prefect_cloud.cli.root import app

if progress:
progress.stop()
app.console.print(message, style="red")
raise typer.Exit(1)


def with_cli_exception_handling(fn: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(fn)
def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return fn(*args, **kwargs)
except (typer.Exit, typer.Abort, ClickException):
raise # Do not capture click or typer exceptions
except MissingProfileError as exc:
exit_with_error(exc)
except Exception:
traceback.print_exc()
exit_with_error("An exception occurred.")

return wrapper


def process_key_value_pairs(env: list[str]) -> dict[str, str]:
invalid_pairs = []

for e in env:
if "=" not in e:
invalid_pairs.append(e)

if invalid_pairs:
raise ValueError(f"Invalid key value pairs: {invalid_pairs}")

return {k: v for k, v in [e.split("=") for e in env]}


class PrefectCloudTyper(typer.Typer):
"""
Wraps commands created by `Typer` to support async functions and handle errors.
"""

console: Console

def __init__(
self,
*args: Any,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
self.console = Console(
highlight=False,
theme=Theme({"prompt.choices": "bold blue"}),
color_system="auto",
)

def add_typer(
self,
typer_instance: "PrefectCloudTyper",
*args: Any,
no_args_is_help: bool = True,
aliases: Optional[list[str]] = None,
**kwargs: Any,
) -> None:
"""
This will cause help to be default command for all sub apps unless specifically stated otherwise, opposite of before.
"""
if aliases:
for alias in aliases:
super().add_typer(
typer_instance,
*args,
name=alias,
no_args_is_help=no_args_is_help,
hidden=True,
**kwargs,
)

return super().add_typer(
typer_instance, *args, no_args_is_help=no_args_is_help, **kwargs
)

def command(
self,
name: Optional[str] = None,
*args: Any,
aliases: Optional[List[str]] = None,
**kwargs: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Create a new command. If aliases are provided, the same command function
will be registered with multiple names.
"""

def wrapper(original_fn: Callable[..., Any]) -> Callable[..., Any]:
# click doesn't support async functions, so we wrap them in
# asyncio.run(). This has the advantage of keeping the function in
# the main thread, which means signal handling works for e.g. the
# server and workers. However, it means that async CLI commands can
# not directly call other async CLI commands (because asyncio.run()
# can not be called nested). In that (rare) circumstance, refactor
# the CLI command so its business logic can be invoked separately
# from its entrypoint.
func = inspect.unwrap(original_fn)

if asyncio.iscoroutinefunction(func):
async_fn = original_fn

@functools.wraps(original_fn)
def sync_fn(*args: Any, **kwargs: Any) -> Any:
return asyncio.run(async_fn(*args, **kwargs))

setattr(sync_fn, "aio", async_fn)
wrapped_fn = sync_fn
else:
wrapped_fn = original_fn

wrapped_fn = with_cli_exception_handling(wrapped_fn)
# register fn with its original name
command_decorator = super(PrefectCloudTyper, self).command(
name=name, *args, **kwargs
)
original_command = command_decorator(wrapped_fn)

# register fn for each alias, e.g. @marvin_app.command(aliases=["r"])
if aliases:
for alias in aliases:
super(PrefectCloudTyper, self).command(
name=alias,
*args,
**{k: v for k, v in kwargs.items() if k != "aliases"},
)(wrapped_fn)

return original_command

return wrapper

def setup_console(self, soft_wrap: bool, prompt: bool) -> None:
self.console = Console(
highlight=False,
color_system="auto",
theme=Theme({"prompt.choices": "bold blue"}),
soft_wrap=not soft_wrap,
force_interactive=prompt,
)
Loading