Skip to content

Commit

Permalink
Support --credentials (#7)
Browse files Browse the repository at this point in the history
* add credentials

* fix cli

* fix readme

* support pre-commit
  • Loading branch information
jakekaplan authored Feb 4, 2025
1 parent adc815b commit 180725c
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 63 deletions.
55 changes: 15 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,34 @@ $ curl -LsSf https://astral.sh/uv/install.sh | sh
$ uvx prefect cloud login
```

## Grab your workflow
Pick a Python file with the function(s) you want to deploy (or use the example below).
The function you specify will be automatically converted into a Prefect `@flow` if it isn't already.
## Deploy your workflow from github

```python
$ cat << 'EOF' > example_workflow.py
def get_message():
return "Hello, World!"

def hello_world():
print(get_message())

def greet_user(name: str, exclaim: bool = False):
message = f"Hello, {name}"
if exclaim:
message += "!"
print(message)
EOF
```shell
$ uvx prefect-cloud deploy FUNCTION_NAME --from PYTHON_FILE_URL_IN_REPO
```

## Deploy

### From local source:
Store your code with prefect cloud for easy deployment.
```bash
$ uvx prefect-cloud deploy example_workflow.py hello_world
For example:
```shell
$ uvx prefect-cloud deploy hello_world --from https://github.com/jakekaplan/demo-flows/blob/main/hello_world.py
```
With additional code files or directories:
```bash
# files
$ ... --include "file1.py" --include "/path/to/file2.py"
# directories
$ ... --include "/dir1/" --include "/path/to/dir2/"
```

### From github
```bash
$ uvx prefect-cloud git-deploy https://github.com/jakekaplan/demo-flows/blob/main/hello_world.py hello_world
### From a Private Repo
```shell
# private repo
$ uvx prefect-cloud deploy FUNCTION_NAME --from PYTHON_FILE_URL_IN_REPO --credentials GITHUB_TOKEN
```

### With dependencies:
```bash
# a package
$ ... --dependencies pandas
$ uvx prefect-cloud deploy ... --from ... --dependencies pandas
# multiple packages
$ ... --dependencies "pandas,numpy"
$ uvx prefect-cloud deploy ... --from ... --dependencies "pandas,numpy"
# requirements file
$ ... --dependencies /path/to/requirements.txt
$ uvx prefect-cloud deploy ... --from ... --dependencies /path/to/requirements.txt
# pyproject.toml
$ ... --dependencies /path/to/pyproject.toml
$ uvx prefect-cloud deploy ... --from ... --dependencies /path/to/pyproject.toml
```

### With environment variables:
```bash
$ ... --env ENV_VAR1=VALUE1 --env ENV_VAR2=VALUE2
$ uvx prefect-cloud deploy ... --from ... --env ENV_VAR1=VALUE1 --env ENV_VAR2=VALUE2
```
53 changes: 44 additions & 9 deletions src/prefect_cloud/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import typer
from rich.progress import Progress, SpinnerColumn, TextColumn

from prefect.cli._utilities import exit_with_error
from prefect.cli._utilities import exit_with_error as _exit_with_error
from prefect.cli.root import PrefectTyper
from prefect.client.base import ServerType, determine_server_type
from prefect.utilities.urls import url_for

from prefect_cloud.dependencies import get_dependencies
from prefect_cloud.github import GitHubFileRef, get_github_raw_content
from prefect_cloud.github import (
GitHubFileRef,
get_github_raw_content,
FileNotFound,
to_pull_step,
)
from prefect_cloud.client import (
get_cloud_api_url,
get_prefect_cloud_client,
Expand All @@ -17,6 +22,12 @@
app = PrefectTyper()


def exit_with_error(message: str, progress: Progress = None):
if progress:
progress.stop()
_exit_with_error(message)


def ensure_prefect_cloud():
if determine_server_type() != ServerType.CLOUD:
exit_with_error("Not logged into Prefect Cloud! Run `uvx prefect cloud login`.")
Expand All @@ -39,23 +50,33 @@ def process_key_value_pairs(env: list[str]) -> dict[str, str]:
async def deploy(
function: str,
file: str = typer.Option(
None,
...,
"--from",
"-f",
help=".py file containing the function to deploy.",
),
dependencies: list[str] = typer.Option(
...,
"--with",
"-d",
help="Dependencies to include. Can be a single package `--with prefect`, "
"multiple packages `--with prefect --with pandas`, "
"the path to a requirements or pyproject.toml file "
"`--with requirements.txt / pyproject.toml`.",
default_factory=list,
),
env: list[str] = typer.Option(
...,
"--env",
"-e",
help="Environment variables to set in the format KEY=VALUE. Can be specified multiple times.",
default_factory=list,
),
credentials: str | None = typer.Option(
None,
"--credentials",
"-c",
help="Optional credentials if code is in a private repository. ",
),
):
ensure_prefect_cloud()
Expand All @@ -68,30 +89,44 @@ async def deploy(
try:
env_vars = process_key_value_pairs(env) if env else {}
except ValueError as e:
exit_with_error(str(e))
exit_with_error(str(e), progress=progress)

async with get_prefect_cloud_client() as client:
task = progress.add_task("Inspecting code in github..", total=None)
task = progress.add_task("Inspecting code...", total=None)

github_ref = GitHubFileRef.from_url(file)
raw_contents = await get_github_raw_content(github_ref)
try:
raw_contents = await get_github_raw_content(github_ref, credentials)
except FileNotFound:
exit_with_error(
"Can't access that file in Github. It either doesn't exist or is private. "
"If it's private repo retry with `--credentials`.",
progress=progress,
)
try:
parameter_schema = get_parameter_schema_from_content(
raw_contents, function
)
except ValueError:
exit_with_error(
f"Could not find function '{function}' in {github_ref.filepath}"
f"Could not find function '{function}' in {github_ref.filepath}",
progress=progress,
)

progress.update(task, description="Ensuring work pool exists...")
progress.update(task, description="Confirming work pool exists...")
work_pool = await client.ensure_managed_work_pool()

progress.update(task, description="Deploying flow...")
deployment_name = f"{function}_deployment"

credentials_name = None
if credentials:
progress.update(task, description="Syncing credentials...")
credentials_name = f"{github_ref.owner}-{github_ref.repo}-credentials"
await client.create_credentials_secret(credentials_name, credentials)

pull_steps = [
github_ref.pull_step,
to_pull_step(github_ref, credentials_name)
# TODO: put back flowify if this a public repo? need to figure that out.
]

Expand Down
38 changes: 37 additions & 1 deletion src/prefect_cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from uuid import UUID, uuid4

from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.actions import WorkPoolCreate
from prefect.exceptions import ObjectNotFound
from prefect.client.schemas.actions import (
BlockDocumentCreate,
BlockDocumentUpdate,
WorkPoolCreate,
)
from prefect.client.schemas.filters import WorkPoolFilter, WorkPoolFilterType
from prefect.settings import (
PREFECT_API_KEY,
Expand Down Expand Up @@ -115,6 +120,37 @@ async def set_deployment_id(self, storage_id: UUID, deployment_id: UUID):
json={"deployment_id": str(deployment_id)},
)

async def create_credentials_secret(self, name: str, credentials: str):
try:
existing_block = await self.read_block_document_by_name(
name, block_type_slug="secret"
)
await self.update_block_document(
block_document_id=existing_block.id,
block_document=BlockDocumentUpdate(
data={
"value": credentials,
},
),
)
except ObjectNotFound:
secret_block_type = await self.read_block_type_by_slug("secret")
secret_block_schema = (
await self.get_most_recent_block_schema_for_block_type(
block_type_id=secret_block_type.id
)
)
await self.create_block_document(
block_document=BlockDocumentCreate(
name=name,
data={
"value": credentials,
},
block_type_id=secret_block_type.id,
block_schema_id=secret_block_schema.id,
)
)


def get_prefect_cloud_client():
return PrefectCloudClient(
Expand Down
41 changes: 28 additions & 13 deletions src/prefect_cloud/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from httpx import AsyncClient


class FileNotFound(Exception):
pass


@dataclass
class GitHubFileRef:
"""Reference to a file in a GitHub repository."""
Expand Down Expand Up @@ -74,25 +78,36 @@ def raw_url(self) -> str:
"""Get the raw.githubusercontent.com URL for this file."""
return f"https://raw.githubusercontent.com/{self.owner}/{self.repo}/refs/heads/{self.branch}/{self.filepath}"

@property
def pull_step(self) -> dict[str, Any]:
return {
"prefect.deployments.steps.git_clone": {
"repository": self.clone_url,
"branch": self.branch,
# TODO: support access token
}
}

def __str__(self) -> str:
return f"github.com/{self.owner}/{self.repo} @ {self.branch} - {self.filepath}"


async def get_github_raw_content(github_ref: GitHubFileRef) -> str:
def to_pull_step(
github_ref: GitHubFileRef, credentials_block: str | None = None
) -> dict[str, Any]:
pull_step_kwargs = {
"repository": github_ref.clone_url,
"branch": github_ref.branch,
}
if credentials_block:
pull_step_kwargs["access_token"] = (
"{{ prefect.blocks.secret." + credentials_block + " }}"
)

return {"prefect.deployments.steps.git_clone": pull_step_kwargs}


async def get_github_raw_content(
github_ref: GitHubFileRef, credentials: str | None = None
) -> str:
"""Get raw content of a file from GitHub."""
headers = {}
if credentials:
headers["Authorization"] = f"Bearer {credentials}"

async with AsyncClient() as client:
response = await client.get(github_ref.raw_url)
response = await client.get(github_ref.raw_url, headers=headers)
if response.status_code == 404:
raise ValueError(f"File not found: {github_ref.filepath}")
raise FileNotFound(f"File not found: {github_ref}")
response.raise_for_status()
return response.text

0 comments on commit 180725c

Please sign in to comment.