From 180725c9d9a8c191af82392b2203eb930e28a881 Mon Sep 17 00:00:00 2001 From: jakekaplan <40362401+jakekaplan@users.noreply.github.com> Date: Tue, 4 Feb 2025 15:36:31 -0500 Subject: [PATCH] Support `--credentials` (#7) * add credentials * fix cli * fix readme * support pre-commit --- README.md | 55 ++++++++++--------------------------- src/prefect_cloud/cli.py | 53 +++++++++++++++++++++++++++++------ src/prefect_cloud/client.py | 38 ++++++++++++++++++++++++- src/prefect_cloud/github.py | 41 ++++++++++++++++++--------- 4 files changed, 124 insertions(+), 63 deletions(-) diff --git a/README.md b/README.md index e63e11a..9c93ab3 100644 --- a/README.md +++ b/README.md @@ -13,59 +13,34 @@ $ curl -LsSf https://astral.sh/uv/install.sh | sh $ uvx prefect cloud login ``` -## Grab your workflow -Pick a Python file with the function(s) you want to deploy (or use the example below). -The function you specify will be automatically converted into a Prefect `@flow` if it isn't already. +## Deploy your workflow from github -```python -$ cat << 'EOF' > example_workflow.py -def get_message(): - return "Hello, World!" - -def hello_world(): - print(get_message()) - -def greet_user(name: str, exclaim: bool = False): - message = f"Hello, {name}" - if exclaim: - message += "!" - print(message) -EOF +```shell +$ uvx prefect-cloud deploy FUNCTION_NAME --from PYTHON_FILE_URL_IN_REPO ``` - -## Deploy - -### From local source: -Store your code with prefect cloud for easy deployment. -```bash -$ uvx prefect-cloud deploy example_workflow.py hello_world +For example: +```shell +$ uvx prefect-cloud deploy hello_world --from https://github.com/jakekaplan/demo-flows/blob/main/hello_world.py ``` -With additional code files or directories: -```bash -# files -$ ... --include "file1.py" --include "/path/to/file2.py" -# directories -$ ... --include "/dir1/" --include "/path/to/dir2/" -``` - -### From github -```bash -$ uvx prefect-cloud git-deploy https://github.com/jakekaplan/demo-flows/blob/main/hello_world.py hello_world +### From a Private Repo +```shell +# private repo +$ uvx prefect-cloud deploy FUNCTION_NAME --from PYTHON_FILE_URL_IN_REPO --credentials GITHUB_TOKEN ``` ### With dependencies: ```bash # a package -$ ... --dependencies pandas +$ uvx prefect-cloud deploy ... --from ... --dependencies pandas # multiple packages -$ ... --dependencies "pandas,numpy" +$ uvx prefect-cloud deploy ... --from ... --dependencies "pandas,numpy" # requirements file -$ ... --dependencies /path/to/requirements.txt +$ uvx prefect-cloud deploy ... --from ... --dependencies /path/to/requirements.txt # pyproject.toml -$ ... --dependencies /path/to/pyproject.toml +$ uvx prefect-cloud deploy ... --from ... --dependencies /path/to/pyproject.toml ``` ### With environment variables: ```bash -$ ... --env ENV_VAR1=VALUE1 --env ENV_VAR2=VALUE2 +$ uvx prefect-cloud deploy ... --from ... --env ENV_VAR1=VALUE1 --env ENV_VAR2=VALUE2 ``` diff --git a/src/prefect_cloud/cli.py b/src/prefect_cloud/cli.py index 9f4d823..1290b4a 100644 --- a/src/prefect_cloud/cli.py +++ b/src/prefect_cloud/cli.py @@ -1,13 +1,18 @@ import typer from rich.progress import Progress, SpinnerColumn, TextColumn -from prefect.cli._utilities import exit_with_error +from prefect.cli._utilities import exit_with_error as _exit_with_error from prefect.cli.root import PrefectTyper from prefect.client.base import ServerType, determine_server_type from prefect.utilities.urls import url_for from prefect_cloud.dependencies import get_dependencies -from prefect_cloud.github import GitHubFileRef, get_github_raw_content +from prefect_cloud.github import ( + GitHubFileRef, + get_github_raw_content, + FileNotFound, + to_pull_step, +) from prefect_cloud.client import ( get_cloud_api_url, get_prefect_cloud_client, @@ -17,6 +22,12 @@ app = PrefectTyper() +def exit_with_error(message: str, progress: Progress = None): + if progress: + progress.stop() + _exit_with_error(message) + + def ensure_prefect_cloud(): if determine_server_type() != ServerType.CLOUD: exit_with_error("Not logged into Prefect Cloud! Run `uvx prefect cloud login`.") @@ -39,23 +50,33 @@ def process_key_value_pairs(env: list[str]) -> dict[str, str]: async def deploy( function: str, file: str = typer.Option( - None, + ..., "--from", "-f", help=".py file containing the function to deploy.", ), dependencies: list[str] = typer.Option( + ..., "--with", "-d", help="Dependencies to include. Can be a single package `--with prefect`, " "multiple packages `--with prefect --with pandas`, " "the path to a requirements or pyproject.toml file " "`--with requirements.txt / pyproject.toml`.", + default_factory=list, ), env: list[str] = typer.Option( + ..., "--env", "-e", help="Environment variables to set in the format KEY=VALUE. Can be specified multiple times.", + default_factory=list, + ), + credentials: str | None = typer.Option( + None, + "--credentials", + "-c", + help="Optional credentials if code is in a private repository. ", ), ): ensure_prefect_cloud() @@ -68,30 +89,44 @@ async def deploy( try: env_vars = process_key_value_pairs(env) if env else {} except ValueError as e: - exit_with_error(str(e)) + exit_with_error(str(e), progress=progress) async with get_prefect_cloud_client() as client: - task = progress.add_task("Inspecting code in github..", total=None) + task = progress.add_task("Inspecting code...", total=None) github_ref = GitHubFileRef.from_url(file) - raw_contents = await get_github_raw_content(github_ref) + try: + raw_contents = await get_github_raw_content(github_ref, credentials) + except FileNotFound: + exit_with_error( + "Can't access that file in Github. It either doesn't exist or is private. " + "If it's private repo retry with `--credentials`.", + progress=progress, + ) try: parameter_schema = get_parameter_schema_from_content( raw_contents, function ) except ValueError: exit_with_error( - f"Could not find function '{function}' in {github_ref.filepath}" + f"Could not find function '{function}' in {github_ref.filepath}", + progress=progress, ) - progress.update(task, description="Ensuring work pool exists...") + progress.update(task, description="Confirming work pool exists...") work_pool = await client.ensure_managed_work_pool() progress.update(task, description="Deploying flow...") deployment_name = f"{function}_deployment" + credentials_name = None + if credentials: + progress.update(task, description="Syncing credentials...") + credentials_name = f"{github_ref.owner}-{github_ref.repo}-credentials" + await client.create_credentials_secret(credentials_name, credentials) + pull_steps = [ - github_ref.pull_step, + to_pull_step(github_ref, credentials_name) # TODO: put back flowify if this a public repo? need to figure that out. ] diff --git a/src/prefect_cloud/client.py b/src/prefect_cloud/client.py index e61be6a..eea967b 100644 --- a/src/prefect_cloud/client.py +++ b/src/prefect_cloud/client.py @@ -2,7 +2,12 @@ from uuid import UUID, uuid4 from prefect.client.orchestration import PrefectClient -from prefect.client.schemas.actions import WorkPoolCreate +from prefect.exceptions import ObjectNotFound +from prefect.client.schemas.actions import ( + BlockDocumentCreate, + BlockDocumentUpdate, + WorkPoolCreate, +) from prefect.client.schemas.filters import WorkPoolFilter, WorkPoolFilterType from prefect.settings import ( PREFECT_API_KEY, @@ -115,6 +120,37 @@ async def set_deployment_id(self, storage_id: UUID, deployment_id: UUID): json={"deployment_id": str(deployment_id)}, ) + async def create_credentials_secret(self, name: str, credentials: str): + try: + existing_block = await self.read_block_document_by_name( + name, block_type_slug="secret" + ) + await self.update_block_document( + block_document_id=existing_block.id, + block_document=BlockDocumentUpdate( + data={ + "value": credentials, + }, + ), + ) + except ObjectNotFound: + secret_block_type = await self.read_block_type_by_slug("secret") + secret_block_schema = ( + await self.get_most_recent_block_schema_for_block_type( + block_type_id=secret_block_type.id + ) + ) + await self.create_block_document( + block_document=BlockDocumentCreate( + name=name, + data={ + "value": credentials, + }, + block_type_id=secret_block_type.id, + block_schema_id=secret_block_schema.id, + ) + ) + def get_prefect_cloud_client(): return PrefectCloudClient( diff --git a/src/prefect_cloud/github.py b/src/prefect_cloud/github.py index 4a116a8..c0065a3 100644 --- a/src/prefect_cloud/github.py +++ b/src/prefect_cloud/github.py @@ -6,6 +6,10 @@ from httpx import AsyncClient +class FileNotFound(Exception): + pass + + @dataclass class GitHubFileRef: """Reference to a file in a GitHub repository.""" @@ -74,25 +78,36 @@ def raw_url(self) -> str: """Get the raw.githubusercontent.com URL for this file.""" return f"https://raw.githubusercontent.com/{self.owner}/{self.repo}/refs/heads/{self.branch}/{self.filepath}" - @property - def pull_step(self) -> dict[str, Any]: - return { - "prefect.deployments.steps.git_clone": { - "repository": self.clone_url, - "branch": self.branch, - # TODO: support access token - } - } - def __str__(self) -> str: return f"github.com/{self.owner}/{self.repo} @ {self.branch} - {self.filepath}" -async def get_github_raw_content(github_ref: GitHubFileRef) -> str: +def to_pull_step( + github_ref: GitHubFileRef, credentials_block: str | None = None +) -> dict[str, Any]: + pull_step_kwargs = { + "repository": github_ref.clone_url, + "branch": github_ref.branch, + } + if credentials_block: + pull_step_kwargs["access_token"] = ( + "{{ prefect.blocks.secret." + credentials_block + " }}" + ) + + return {"prefect.deployments.steps.git_clone": pull_step_kwargs} + + +async def get_github_raw_content( + github_ref: GitHubFileRef, credentials: str | None = None +) -> str: """Get raw content of a file from GitHub.""" + headers = {} + if credentials: + headers["Authorization"] = f"Bearer {credentials}" + async with AsyncClient() as client: - response = await client.get(github_ref.raw_url) + response = await client.get(github_ref.raw_url, headers=headers) if response.status_code == 404: - raise ValueError(f"File not found: {github_ref.filepath}") + raise FileNotFound(f"File not found: {github_ref}") response.raise_for_status() return response.text