diff --git a/pyproject.toml b/pyproject.toml index fb2a29d..5173b13 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ dev = [ ] [project.scripts] -prefect-cloud = "prefect_cloud:cli.app" +prefect-cloud = "prefect_cloud.cli.root:app" [build-system] requires = ["hatchling"] @@ -51,6 +51,7 @@ build-backend = "hatchling.build" [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "session" +colors = "yes" testpaths = [ "tests", ] diff --git a/src/prefect_cloud/pull_steps/__init__.py b/src/prefect_cloud/cli/__init__.py similarity index 100% rename from src/prefect_cloud/pull_steps/__init__.py rename to src/prefect_cloud/cli/__init__.py diff --git a/src/prefect_cloud/completions.py b/src/prefect_cloud/cli/completions.py similarity index 100% rename from src/prefect_cloud/completions.py rename to src/prefect_cloud/cli/completions.py diff --git a/src/prefect_cloud/cli.py b/src/prefect_cloud/cli/root.py similarity index 67% rename from src/prefect_cloud/cli.py rename to src/prefect_cloud/cli/root.py index 46f1157..4b13781 100644 --- a/src/prefect_cloud/cli.py +++ b/src/prefect_cloud/cli/root.py @@ -1,20 +1,18 @@ -import asyncio -import functools -import inspect -import traceback -from typing import Any, Callable, List, Optional from uuid import UUID import typer import tzlocal -from click import ClickException -from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table from rich.text import Text -from rich.theme import Theme -from prefect_cloud import auth, completions, deployments +from prefect_cloud import auth, deployments +from prefect_cloud.cli import completions +from prefect_cloud.cli.utilities import ( + PrefectCloudTyper, + exit_with_error, + process_key_value_pairs, +) from prefect_cloud.dependencies import get_dependencies from prefect_cloud.github import ( FileNotFound, @@ -28,160 +26,13 @@ IntervalSchedule, RRuleSchedule, ) -from prefect_cloud.utilities.exception import MissingProfileError from prefect_cloud.utilities.flows import get_parameter_schema_from_content from prefect_cloud.utilities.tui import redacted -class PrefectCloudTyper(typer.Typer): - """ - Wraps commands created by `Typer` to support async functions and handle errors. - """ - - console: Console - - def __init__( - self, - *args: Any, - **kwargs: Any, - ): - super().__init__(*args, **kwargs) - self.console = Console( - highlight=False, - theme=Theme({"prompt.choices": "bold blue"}), - color_system="auto", - ) - - def add_typer( - self, - typer_instance: "PrefectCloudTyper", - *args: Any, - no_args_is_help: bool = True, - aliases: Optional[list[str]] = None, - **kwargs: Any, - ) -> None: - """ - This will cause help to be default command for all sub apps unless specifically stated otherwise, opposite of before. - """ - if aliases: - for alias in aliases: - super().add_typer( - typer_instance, - *args, - name=alias, - no_args_is_help=no_args_is_help, - hidden=True, - **kwargs, - ) - - return super().add_typer( - typer_instance, *args, no_args_is_help=no_args_is_help, **kwargs - ) - - def command( - self, - name: Optional[str] = None, - *args: Any, - aliases: Optional[List[str]] = None, - **kwargs: Any, - ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: - """ - Create a new command. If aliases are provided, the same command function - will be registered with multiple names. - """ - - def wrapper(original_fn: Callable[..., Any]) -> Callable[..., Any]: - # click doesn't support async functions, so we wrap them in - # asyncio.run(). This has the advantage of keeping the function in - # the main thread, which means signal handling works for e.g. the - # server and workers. However, it means that async CLI commands can - # not directly call other async CLI commands (because asyncio.run() - # can not be called nested). In that (rare) circumstance, refactor - # the CLI command so its business logic can be invoked separately - # from its entrypoint. - func = inspect.unwrap(original_fn) - - if asyncio.iscoroutinefunction(func): - async_fn = original_fn - - @functools.wraps(original_fn) - def sync_fn(*args: Any, **kwargs: Any) -> Any: - return asyncio.run(async_fn(*args, **kwargs)) - - setattr(sync_fn, "aio", async_fn) - wrapped_fn = sync_fn - else: - wrapped_fn = original_fn - - wrapped_fn = with_cli_exception_handling(wrapped_fn) - # register fn with its original name - command_decorator = super(PrefectCloudTyper, self).command( - name=name, *args, **kwargs - ) - original_command = command_decorator(wrapped_fn) - - # register fn for each alias, e.g. @marvin_app.command(aliases=["r"]) - if aliases: - for alias in aliases: - super(PrefectCloudTyper, self).command( - name=alias, - *args, - **{k: v for k, v in kwargs.items() if k != "aliases"}, - )(wrapped_fn) - - return original_command - - return wrapper - - def setup_console(self, soft_wrap: bool, prompt: bool) -> None: - self.console = Console( - highlight=False, - color_system="auto", - theme=Theme({"prompt.choices": "bold blue"}), - soft_wrap=not soft_wrap, - force_interactive=prompt, - ) - - app = PrefectCloudTyper() -def exit_with_error(message: str | Exception, progress: Progress = None): - if progress: - progress.stop() - app.console.print(message, style="red") - raise typer.Exit(1) - - -def with_cli_exception_handling(fn: Callable[..., Any]) -> Callable[..., Any]: - @functools.wraps(fn) - def wrapper(*args: Any, **kwargs: Any) -> Any: - try: - return fn(*args, **kwargs) - except (typer.Exit, typer.Abort, ClickException): - raise # Do not capture click or typer exceptions - except MissingProfileError as exc: - exit_with_error(exc) - except Exception: - traceback.print_exc() - exit_with_error("An exception occurred.") - - return wrapper - - -def process_key_value_pairs(env: list[str]) -> dict[str, str]: - invalid_pairs = [] - - for e in env: - if "=" not in e: - invalid_pairs.append(e) - - if invalid_pairs: - raise ValueError(f"Invalid key value pairs: {invalid_pairs}") - - return {k: v for k, v in [e.split("=") for e in env]} - - @app.command() async def deploy( function: str, @@ -256,8 +107,8 @@ async def deploy( raw_contents = await get_github_raw_content(github_ref, credentials) except FileNotFound: exit_with_error( - "Can't access that file in Github. It either doesn't exist or is private. " - "If it's private repo retry with `--credentials`.", + "Unable to access file in Github. " + "If it's in a private repository retry with `--credentials`.", progress=progress, ) diff --git a/src/prefect_cloud/cli/utilities.py b/src/prefect_cloud/cli/utilities.py new file mode 100644 index 0000000..bb62acf --- /dev/null +++ b/src/prefect_cloud/cli/utilities.py @@ -0,0 +1,161 @@ +import asyncio +import functools +import inspect +import traceback +from typing import Any, Callable, List, Optional + +import typer +from click import ClickException +from rich.console import Console +from rich.progress import Progress +from rich.theme import Theme + +from prefect_cloud.utilities.exception import MissingProfileError + + +def exit_with_error(message: str | Exception, progress: Progress = None): + from prefect_cloud.cli.root import app + + if progress: + progress.stop() + app.console.print(message, style="red") + raise typer.Exit(1) + + +def with_cli_exception_handling(fn: Callable[..., Any]) -> Callable[..., Any]: + @functools.wraps(fn) + def wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return fn(*args, **kwargs) + except (typer.Exit, typer.Abort, ClickException): + raise # Do not capture click or typer exceptions + except MissingProfileError as exc: + exit_with_error(exc) + except Exception: + traceback.print_exc() + exit_with_error("An exception occurred.") + + return wrapper + + +def process_key_value_pairs(env: list[str]) -> dict[str, str]: + invalid_pairs = [] + + for e in env: + if "=" not in e: + invalid_pairs.append(e) + + if invalid_pairs: + raise ValueError(f"Invalid key value pairs: {invalid_pairs}") + + return {k: v for k, v in [e.split("=") for e in env]} + + +class PrefectCloudTyper(typer.Typer): + """ + Wraps commands created by `Typer` to support async functions and handle errors. + """ + + console: Console + + def __init__( + self, + *args: Any, + **kwargs: Any, + ): + super().__init__(*args, **kwargs) + self.console = Console( + highlight=False, + theme=Theme({"prompt.choices": "bold blue"}), + color_system="auto", + ) + + def add_typer( + self, + typer_instance: "PrefectCloudTyper", + *args: Any, + no_args_is_help: bool = True, + aliases: Optional[list[str]] = None, + **kwargs: Any, + ) -> None: + """ + This will cause help to be default command for all sub apps unless specifically stated otherwise, opposite of before. + """ + if aliases: + for alias in aliases: + super().add_typer( + typer_instance, + *args, + name=alias, + no_args_is_help=no_args_is_help, + hidden=True, + **kwargs, + ) + + return super().add_typer( + typer_instance, *args, no_args_is_help=no_args_is_help, **kwargs + ) + + def command( + self, + name: Optional[str] = None, + *args: Any, + aliases: Optional[List[str]] = None, + **kwargs: Any, + ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """ + Create a new command. If aliases are provided, the same command function + will be registered with multiple names. + """ + + def wrapper(original_fn: Callable[..., Any]) -> Callable[..., Any]: + # click doesn't support async functions, so we wrap them in + # asyncio.run(). This has the advantage of keeping the function in + # the main thread, which means signal handling works for e.g. the + # server and workers. However, it means that async CLI commands can + # not directly call other async CLI commands (because asyncio.run() + # can not be called nested). In that (rare) circumstance, refactor + # the CLI command so its business logic can be invoked separately + # from its entrypoint. + func = inspect.unwrap(original_fn) + + if asyncio.iscoroutinefunction(func): + async_fn = original_fn + + @functools.wraps(original_fn) + def sync_fn(*args: Any, **kwargs: Any) -> Any: + return asyncio.run(async_fn(*args, **kwargs)) + + setattr(sync_fn, "aio", async_fn) + wrapped_fn = sync_fn + else: + wrapped_fn = original_fn + + wrapped_fn = with_cli_exception_handling(wrapped_fn) + # register fn with its original name + command_decorator = super(PrefectCloudTyper, self).command( + name=name, *args, **kwargs + ) + original_command = command_decorator(wrapped_fn) + + # register fn for each alias, e.g. @marvin_app.command(aliases=["r"]) + if aliases: + for alias in aliases: + super(PrefectCloudTyper, self).command( + name=alias, + *args, + **{k: v for k, v in kwargs.items() if k != "aliases"}, + )(wrapped_fn) + + return original_command + + return wrapper + + def setup_console(self, soft_wrap: bool, prompt: bool) -> None: + self.console = Console( + highlight=False, + color_system="auto", + theme=Theme({"prompt.choices": "bold blue"}), + soft_wrap=not soft_wrap, + force_interactive=prompt, + ) diff --git a/src/prefect_cloud/pull_steps/flowify.py b/src/prefect_cloud/pull_steps/flowify.py deleted file mode 100644 index b2ee245..0000000 --- a/src/prefect_cloud/pull_steps/flowify.py +++ /dev/null @@ -1,78 +0,0 @@ -import ast -import sys -from pathlib import Path -from typing import Any - - -class FlowDecorator(ast.NodeTransformer): - def __init__(self, target_name: str): - self.modified = False - self.target_name = target_name - - def is_flow_decorator(self, node: ast.expr) -> bool: - """Check if a decorator node is a flow decorator.""" - if isinstance(node, ast.Call): - return isinstance(node.func, ast.Name) and node.func.id == "flow" - return isinstance(node, ast.Name) and node.id == "flow" - - def visit_FunctionDef(self, node: ast.FunctionDef) -> Any: - if not self.modified and node.name == self.target_name: - # Check if function already has a flow decorator - has_flow = any(self.is_flow_decorator(dec) for dec in node.decorator_list) - - if not has_flow: - self.modified = True - # Create @flow(log_prints=True) decorator - flow_decorator = ast.Call( - func=ast.Name(id="flow", ctx=ast.Load()), - args=[], - keywords=[ - ast.keyword(arg="log_prints", value=ast.Constant(value=True)) - ], - ) - node.decorator_list.append(flow_decorator) - return node - - -def add_flow_decorator(content: str, function_name: str) -> str: - """Adds @flow decorator to specified function and returns modified content.""" - tree = ast.parse(content) - - # Add import if it doesn't exist - has_flow_import = any( - isinstance(node, ast.ImportFrom) - and node.module == "prefect" - and any(name.name == "flow" for name in node.names) - for node in tree.body - ) - - if not has_flow_import: - import_flow = ast.ImportFrom( - module="prefect", names=[ast.alias(name="flow", asname=None)], level=0 - ) - tree.body.insert(0, import_flow) - - transformer = FlowDecorator(function_name) - modified_tree = transformer.visit(tree) - - return ast.unparse(modified_tree) - - -def main() -> None: - if len(sys.argv) != 3: - print("Usage: flowify.py ") - sys.exit(1) - - file_path = Path(sys.argv[1]) - function_name = sys.argv[2] - - content = file_path.read_text() - - modified_content = add_flow_decorator(content, function_name) - - file_path.write_text(modified_content) - print(f"Modified {file_path}") - - -if __name__ == "__main__": - main() diff --git a/src/prefect_cloud/schemas/__init__.py b/src/prefect_cloud/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_cli/test_root.py b/tests/test_cli/test_root.py new file mode 100644 index 0000000..d3f1a87 --- /dev/null +++ b/tests/test_cli/test_root.py @@ -0,0 +1,450 @@ +from __future__ import annotations + +import contextlib +import re +import textwrap +from typing import Iterable +from unittest.mock import AsyncMock, MagicMock, patch + +import readchar +from rich.console import Console +from typer.testing import CliRunner, Result + +from prefect_cloud.cli.root import app +from prefect_cloud.github import FileNotFound + + +def check_contains(cli_result: Result, content: str, should_contain: bool) -> None: + """ + Utility function to see if content is or is not in a CLI result. + + Args: + should_contain: if True, checks that content is in cli_result, + if False, checks that content is not in cli_result + """ + output = cli_result.stdout.strip() + content = textwrap.dedent(content).strip() + + if should_contain: + section_heading = "------ desired content ------" + else: + section_heading = "------ undesired content ------" + + print(section_heading) + print(content) + print() + + if len(content) > 20: + display_content = content[:20] + "..." + else: + display_content = content + + if should_contain: + assert content in output, ( + f"Desired contents {display_content!r} not found in CLI output" + ) + else: + assert content not in output, ( + f"Undesired contents {display_content!r} found in CLI output" + ) + + +def invoke_and_assert( + command: str | list[str], + user_input: str | None = None, + prompts_and_responses: list[tuple[str, str] | tuple[str, str, str]] | None = None, + expected_output: str | None = None, + expected_output_contains: str | Iterable[str] | None = None, + expected_output_does_not_contain: str | Iterable[str] | None = None, + expected_line_count: int | None = None, + expected_code: int | None = 0, + echo: bool = True, + temp_dir: str | None = None, +) -> Result: + """ + Test utility for the Prefect CLI application, asserts exact match with CLI output. + + Args: + command: Command passed to the Typer CliRunner + user_input: User input passed to the Typer CliRunner when running interactive + commands. + expected_output: Used when you expect the CLI output to be an exact match with + the provided text. + expected_output_contains: Used when you expect the CLI output to contain the + string or strings. + expected_output_does_not_contain: Used when you expect the CLI output to not + contain the string or strings. + expected_code: 0 if we expect the app to exit cleanly, else 1 if we expect + the app to exit with an error. + temp_dir: if provided, the CLI command will be run with this as its present + working directory. + """ + prompts_and_responses = prompts_and_responses or [] + runner = CliRunner() + if temp_dir: + ctx = runner.isolated_filesystem(temp_dir=temp_dir) + else: + ctx = contextlib.nullcontext() + + if user_input and prompts_and_responses: + raise ValueError("Cannot provide both user_input and prompts_and_responses") + + if prompts_and_responses: + user_input = ( + ("\n".join(response for (_, response, *_) in prompts_and_responses) + "\n") + .replace("↓", readchar.key.DOWN) + .replace("↑", readchar.key.UP) + ) + + with ctx: + result = runner.invoke(app, command, catch_exceptions=False, input=user_input) + + if echo: + print("\n------ CLI output ------") + print(result.stdout) + + if expected_code is not None: + assertion_error_message = ( + f"Expected code {expected_code} but got {result.exit_code}\n" + "Output from CLI command:\n" + "-----------------------\n" + f"{result.stdout}" + ) + assert result.exit_code == expected_code, assertion_error_message + + if expected_output is not None: + output = result.stdout.strip() + expected_output = textwrap.dedent(expected_output).strip() + + compare_string = ( + "------ expected ------\n" + f"{expected_output}\n" + "------ actual ------\n" + f"{output}\n" + "------ end ------\n" + ) + assert output == expected_output, compare_string + + if prompts_and_responses: + output = result.stdout.strip() + cursor = 0 + + for item in prompts_and_responses: + prompt = item[0] + selected_option = item[2] if len(item) == 3 else None + + prompt_re = rf"{re.escape(prompt)}.*?" + if not selected_option: + # If we're not prompting for a table, then expect that the + # prompt ends with a colon. + prompt_re += ":" + + match = re.search(prompt_re, output[cursor:]) + if not match: + raise AssertionError(f"Prompt '{prompt}' not found in CLI output") + cursor = cursor + match.end() + + if selected_option: + option_re = re.escape(f"│ > │ {selected_option}") + match = re.search(option_re, output[cursor:]) + if not match: + raise AssertionError( + f"Option '{selected_option}' not found after prompt '{prompt}'" + ) + cursor = cursor + match.end() + + if expected_output_contains is not None: + if isinstance(expected_output_contains, str): + check_contains(result, expected_output_contains, should_contain=True) + else: + for contents in expected_output_contains: + check_contains(result, contents, should_contain=True) + + if expected_output_does_not_contain is not None: + if isinstance(expected_output_does_not_contain, str): + check_contains( + result, expected_output_does_not_contain, should_contain=False + ) + else: + for contents in expected_output_does_not_contain: + check_contains(result, contents, should_contain=False) + + if expected_line_count is not None: + line_count = len(result.stdout.splitlines()) + assert expected_line_count == line_count, ( + f"Expected {expected_line_count} lines of CLI output, only" + f" {line_count} lines present" + ) + + return result + + +@contextlib.contextmanager +def temporary_console_width(console: Console, width: int): + original = console.width + + try: + console._width = width # type: ignore + yield + finally: + console._width = original # type: ignore + + +def test_deploy_command_basic(): + """Test basic deployment without running""" + with patch("prefect_cloud.auth.get_prefect_cloud_client") as mock_client: + # Setup mock client and responses + client = AsyncMock() + mock_client.return_value.__aenter__.return_value = client + + # Mock auth responses + client.ensure_managed_work_pool = AsyncMock(return_value="test-pool") + client.create_managed_deployment = AsyncMock(return_value="test-deployment-id") + + # Mock auth.get_cloud_urls_or_login + with patch("prefect_cloud.cli.root.auth.get_cloud_urls_or_login") as mock_urls: + mock_urls.return_value = ("https://ui.url", "https://api.url", "test-key") + + # Mock GitHub content retrieval + with patch("prefect_cloud.cli.root.get_github_raw_content") as mock_content: + mock_content.return_value = textwrap.dedent(""" + def test_function(): + pass + """).lstrip() + + invoke_and_assert( + command=[ + "deploy", + "test_function", + "--from", + "https://github.com/owner/repo/blob/main/test.py", + "--with", + "prefect", + ], + expected_code=0, + expected_output_contains=[ + "View deployment here", + "Run it with: ", + "prefect-cloud run test_function/test_function", + ], + ) + + # Verify the deployment was created with expected args + client.create_managed_deployment.assert_called_once() + call_args = client.create_managed_deployment.call_args[0] + assert call_args[0] == "test_function" # deployment name + assert call_args[1] == "test.py" # filepath + assert call_args[2] == "test_function" # function name + assert call_args[3] == "test-pool" # work pool + + +def test_deploy_and_run(): + """Test deployment with immediate run""" + with patch("prefect_cloud.auth.get_prefect_cloud_client") as mock_client: + client = AsyncMock() + mock_client.return_value.__aenter__.return_value = client + + # Mock necessary responses + client.ensure_managed_work_pool = AsyncMock(return_value="test-pool") + client.create_managed_deployment = AsyncMock(return_value="test-deployment-id") + client.create_flow_run_from_deployment_id = AsyncMock( + return_value=MagicMock(id="test-run-id", name="test-run") + ) + + with patch("prefect_cloud.cli.root.auth.get_cloud_urls_or_login") as mock_urls: + mock_urls.return_value = ("https://ui.url", "https://api.url", "test-key") + + with patch("prefect_cloud.cli.root.get_github_raw_content") as mock_content: + mock_content.return_value = textwrap.dedent(""" + def test_function(x: int): + pass + """).lstrip() + + invoke_and_assert( + command=[ + "deploy", + "test_function", + "--from", + "https://github.com/owner/repo/blob/main/test.py", + "--with", + "prefect", + "--run", + "--parameters", + "x=1", + ], + expected_code=0, + expected_output_contains=[ + "View deployment here", + "View flow run here", + ], + ) + + # Verify flow run was created + client.create_flow_run_from_deployment_id.assert_called_once_with( + "test-deployment-id", {"x": "1"} + ) + + +def test_deploy_private_repo_without_credentials(): + """Test deployment fails appropriately when accessing private repo without credentials""" + with patch("prefect_cloud.auth.get_prefect_cloud_client") as mock_client: + client = AsyncMock() + mock_client.return_value.__aenter__.return_value = client + + with patch("prefect_cloud.cli.root.auth.get_cloud_urls_or_login") as mock_urls: + mock_urls.return_value = ("https://ui.url", "https://api.url", "test-key") + + with patch("prefect_cloud.cli.root.get_github_raw_content") as mock_content: + mock_content.side_effect = FileNotFound() + + invoke_and_assert( + command=[ + "deploy", + "test_function", + "--from", + "https://github.com/owner/repo/blob/main/test.py", + "--with", + "prefect", + ], + expected_code=1, + expected_output_contains=("Unable to access file in Github."), + ) + + +def test_deploy_with_env_vars(): + """Test deployment with environment variables""" + with patch("prefect_cloud.auth.get_prefect_cloud_client") as mock_client: + client = AsyncMock() + mock_client.return_value.__aenter__.return_value = client + + client.ensure_managed_work_pool = AsyncMock(return_value="test-pool") + client.create_managed_deployment = AsyncMock(return_value="test-deployment-id") + + with patch("prefect_cloud.cli.root.auth.get_cloud_urls_or_login") as mock_urls: + mock_urls.return_value = ("https://ui.url", "https://api.url", "test-key") + + with patch("prefect_cloud.cli.root.get_github_raw_content") as mock_content: + mock_content.return_value = textwrap.dedent(""" + def test_function(): + pass + """).lstrip() + + invoke_and_assert( + command=[ + "deploy", + "test_function", + "--from", + "https://github.com/owner/repo/blob/main/test.py", + "--with", + "prefect", + "--env", + "API_KEY=secret", + "--env", + "DEBUG=true", + ], + expected_code=0, + expected_output_contains="View deployment here", + ) + + # Verify environment variables were passed correctly + client.create_managed_deployment.assert_called_once() + job_variables = client.create_managed_deployment.call_args[1][ + "job_variables" + ] + assert job_variables["env"]["API_KEY"] == "secret" + assert job_variables["env"]["DEBUG"] == "true" + + +def test_deploy_with_private_repo_credentials(): + """Test deployment with credentials for private repository""" + with patch("prefect_cloud.auth.get_prefect_cloud_client") as mock_client: + client = AsyncMock() + mock_client.return_value.__aenter__.return_value = client + + client.ensure_managed_work_pool = AsyncMock(return_value="test-pool") + client.create_managed_deployment = AsyncMock(return_value="test-deployment-id") + client.create_credentials_secret = AsyncMock() + + with patch("prefect_cloud.cli.root.auth.get_cloud_urls_or_login") as mock_urls: + mock_urls.return_value = ("https://ui.url", "https://api.url", "test-key") + + with patch("prefect_cloud.cli.root.get_github_raw_content") as mock_content: + mock_content.return_value = textwrap.dedent(""" + def test_function(): + pass + """).lstrip() + + invoke_and_assert( + command=[ + "deploy", + "test_function", + "--from", + "https://github.com/owner/repo/blob/main/test.py", + "--with", + "prefect", + "--credentials", + "github_token", + ], + expected_code=0, + expected_output_contains="View deployment here", + ) + + # Verify credentials were stored + client.create_credentials_secret.assert_called_once_with( + "owner-repo-credentials", "github_token" + ) + + +def test_deploy_invalid_parameters(): + """Test deployment fails with invalid parameter format""" + with patch("prefect_cloud.auth.get_prefect_cloud_client") as mock_client: + client = AsyncMock() + mock_client.return_value.__aenter__.return_value = client + + with patch("prefect_cloud.cli.root.auth.get_cloud_urls_or_login") as mock_urls: + mock_urls.return_value = ("https://ui.url", "https://api.url", "test-key") + + invoke_and_assert( + command=[ + "deploy", + "test_function", + "--from", + "https://github.com/owner/repo/blob/main/test.py", + "--with", + "prefect", + "--run", + "--parameters", + "invalid_param", # Missing = sign + ], + expected_code=1, + expected_output_contains="Invalid key value pairs", + ) + + +def test_deploy_function_not_found(): + """Test deployment fails when function doesn't exist in file""" + with patch("prefect_cloud.auth.get_prefect_cloud_client") as mock_client: + client = AsyncMock() + mock_client.return_value.__aenter__.return_value = client + + with patch("prefect_cloud.cli.root.auth.get_cloud_urls_or_login") as mock_urls: + mock_urls.return_value = ("https://ui.url", "https://api.url", "test-key") + + with patch("prefect_cloud.cli.root.get_github_raw_content") as mock_content: + mock_content.return_value = textwrap.dedent(""" + def other_function(): + pass + """).lstrip() + + invoke_and_assert( + command=[ + "deploy", + "test_function", + "--from", + "https://github.com/owner/repo/blob/main/test.py", + "--with", + "prefect", + ], + expected_code=1, + expected_output_contains="Could not find function 'test_function'", + )