Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uploading Docs to GitHub Pages #28

Merged
merged 11 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Deploy Sphinx documentation to Pages

# Runs on pushes targeting the default branch
on:
push:
branches: [main]
# Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages
permissions:
contents: read
pages: write
id-token: write

# Allow one concurrent deployment
concurrency:
group: "pages"
cancel-in-progress: true

jobs:
pages:
runs-on: ubuntu-20.04
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
permissions:
pages: write
id-token: write
steps:
- id: deployment
uses: sphinx-notes/pages@v3
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,4 @@ __________________

All contributions, bug reports, bug fixes, documentation improvements, enhancements are welcome.

A detailed overview an how to contribute can be found in the [Contributing Guide](https://github.com/astronomer/astro-provider-anyscale/blob/main/CONTRIBUTING.rst)
A detailed overview an how to contribute can be found in the [Contributing Guide](https://github.com/astronomer/astro-provider-anyscale/blob/main/docs/CONTRIBUTING.rst)
4 changes: 0 additions & 4 deletions anyscale_provider/hooks/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ class AnyscaleHook(BaseHook):
"""
This hook handles authenticating and making calls to the Anyscale SDK

.. seealso::
For more information on how to use this hook, take a look at the guide:
:ref:`howto/hook:AnyscaleHook`

:param conn_id: Optional. The connection ID to use for Anyscale. Defaults to "anyscale_default".
"""

Expand Down
60 changes: 52 additions & 8 deletions anyscale_provider/operators/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ class SubmitAnyscaleJob(BaseOperator):
with the necessary parameters to define and configure the job, and provides mechanisms
for job submission, status tracking, and handling job outcomes.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SubmitAnyscaleJobOperator`

:param conn_id: Required. The connection ID for Anyscale.
:param entrypoint: Required. Command that will be run to execute the job, e.g., `python main.py`.
:param name: Optional. Name of the job. Multiple jobs can be submitted with the same name.
Expand Down Expand Up @@ -111,6 +107,12 @@ def __init__(
self.job_id: str | None = None

def on_kill(self) -> None:
"""
Terminate the Anyscale job if the task is killed.

This method will be called when the task is killed, and it sends a termination
request for the currently running job.
"""
if self.job_id is not None:
self.hook.terminate_job(self.job_id, 5)
self.log.info("Termination request received. Submitted request to terminate the anyscale job.")
Expand All @@ -122,6 +124,16 @@ def hook(self) -> AnyscaleHook:
return AnyscaleHook(conn_id=self.conn_id)

def execute(self, context: Context) -> None:
"""
Execute the job submission to Anyscale.

This method submits the job to Anyscale and handles its initial status.
It defers the execution to a trigger if the job is still running or starting.

:param context: The Airflow context.
:return: The job ID if the job is successfully submitted and completed, or None if the job is deferred.
"""

job_params: dict[str, Any] = {
"entrypoint": self.entrypoint,
"name": self.name,
Expand All @@ -137,6 +149,7 @@ def execute(self, context: Context) -> None:
"project": self.project,
"max_retries": self.max_retries,
}

self.log.info(f"Using Anyscale version {anyscale.__version__}")
# Submit the job to Anyscale
job_config = JobConfig(**job_params)
Expand Down Expand Up @@ -169,6 +182,16 @@ def execute(self, context: Context) -> None:
raise Exception(f"Unexpected state `{current_state}` for job_id `{self.job_id}`.")

def execute_complete(self, context: Context, event: Any) -> None:
"""
Complete the execution of the job based on the trigger event.

This method is called when the trigger fires and provides the final status
of the job. It raises an exception if the job failed.

:param context: The Airflow context.
:param event: The event data from the trigger.
:return: None
"""
current_job_id = event["job_id"]

if event["state"] == JobState.FAILED:
Expand All @@ -186,10 +209,6 @@ class RolloutAnyscaleService(BaseOperator):
configurations and options. It ensures the service is rolled out according to the
specified parameters and handles the deployment lifecycle.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:RolloutAnyscaleServiceOperator`

:param conn_id: Required. The connection ID for Anyscale.
:param name: Required. Unique name of the service.
:param image_uri: Optional. URI of an existing image. Exclusive with `containerfile`.
Expand Down Expand Up @@ -299,12 +318,27 @@ def hook(self) -> AnyscaleHook:
return AnyscaleHook(conn_id=self.conn_id)

def on_kill(self) -> None:
"""
Terminate the Anyscale service rollout if the task is killed.

This method will be called when the task is killed, and it sends a termination
request for the currently running service rollout.
"""
if self.name is not None:
self.hook.terminate_service(self.name, 5)
self.log.info("Termination request received. Submitted request to terminate the anyscale service rollout.")
return

def execute(self, context: Context) -> str | None:
"""
Execute the service rollout to Anyscale.

This method deploys the service to Anyscale with the provided configuration
and parameters. It defers the execution to a trigger if the service is in progress.

:param context: The Airflow context.
:return: The service ID if the rollout is successfully initiated, or None if the job is deferred.
"""
service_params = {
"name": self.name,
"image_uri": self.image_uri,
Expand Down Expand Up @@ -354,6 +388,16 @@ def execute(self, context: Context) -> str | None:
)

def execute_complete(self, context: Context, event: Any) -> None:
"""
Complete the execution of the service rollout based on the trigger event.

This method is called when the trigger fires and provides the final status
of the service rollout. It raises an exception if the rollout failed.

:param context: The Airflow context.
:param event: The event data from the trigger.
:return: None
"""
service_name = event["service_name"]
state = event["state"]

Expand Down
57 changes: 47 additions & 10 deletions anyscale_provider/triggers/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ class AnyscaleJobTrigger(BaseTrigger):
yields events based on the job's status. It handles timeouts and errors during
the polling process.

.. seealso::
For more information on how to use this trigger, take a look at the guide:
:ref:`howto/trigger:AnyscaleJobTrigger`

:param conn_id: Required. The connection ID for Anyscale.
:param job_id: Required. The ID of the job to monitor.
:param poll_interval: Optional. Interval in seconds between status checks. Defaults to 60 seconds.
Expand All @@ -38,10 +34,19 @@ def __init__(self, conn_id: str, job_id: str, poll_interval: float = 60, fetch_l

@cached_property
def hook(self) -> AnyscaleHook:
"""Return an instance of the AnyscaleHook."""
"""
Return an instance of the AnyscaleHook.

:return: AnyscaleHook instance configured with the provided connection ID.
"""
return AnyscaleHook(conn_id=self.conn_id)

def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serialize the trigger configuration for persistence.

:return: A tuple containing the path to the trigger class and a dictionary of the trigger's parameters.
"""
return (
"anyscale_provider.triggers.anyscale.AnyscaleJobTrigger",
{
Expand All @@ -52,7 +57,11 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Monitor the job status periodically until a terminal state is reached or an error occurs.

:yield: TriggerEvent indicating the current status of the job.
"""
try:
# Loop until reach the terminal state
# TODO: Make this call async
Expand Down Expand Up @@ -90,6 +99,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)

def _is_terminal_state(self, job_id: str) -> bool:
"""
Check if the job has reached a terminal state.

:param job_id: The ID of the job to check the status for.
:return: True if the job is in a terminal state, False otherwise.
"""
job_state = self.hook.get_job_status(job_id).state
self.log.info(f"Current job state for {job_id} is: {job_state}")
return job_state not in (JobState.STARTING, JobState.RUNNING)
Expand All @@ -103,10 +118,6 @@ class AnyscaleServiceTrigger(BaseTrigger):
and yields events based on the service's status. It handles timeouts and errors
during the monitoring process.

.. seealso::
For more information on how to use this trigger, take a look at the guide:
:ref:`howto/trigger:AnyscaleServiceTrigger`

:param conn_id: Required. The connection ID for Anyscale.
:param service_name: Required. The ID of the service to monitor.
:param expected_state: Required. The expected final state of the service.
Expand All @@ -130,10 +141,19 @@ def __init__(

@cached_property
def hook(self) -> AnyscaleHook:
"""Return an instance of the AnyscaleHook."""
"""
Return an instance of the AnyscaleHook.

:return: AnyscaleHook instance configured with the provided connection ID.
"""
return AnyscaleHook(conn_id=self.conn_id)

def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serialize the trigger configuration for persistence.

:return: A tuple containing the path to the trigger class and a dictionary of the trigger's parameters.
"""
return (
"anyscale_provider.triggers.anyscale.AnyscaleServiceTrigger",
{
Expand All @@ -146,6 +166,11 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Monitor the service status periodically until the expected state is reached or an error occurs.

:yield: TriggerEvent indicating the current status of the service.
"""
self.log.info(
f"Monitoring service {self.service_name} every {self.poll_interval} seconds to reach {self.expected_state}"
)
Expand Down Expand Up @@ -181,6 +206,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)

def _get_current_state(self, service_name: str) -> str:
"""
Get the current status of the specified service.

:param service_name: The name of the service to check the status for.
:return: The current status of the service.
"""
service_status = self.hook.get_service_status(service_name)

if self.canary_percent is None or self.canary_percent == 100.0:
Expand All @@ -192,6 +223,12 @@ def _get_current_state(self, service_name: str) -> str:
return str(service_status.state)

def _check_current_state(self, service_name: str) -> bool:
"""
Check if the service is still in a transitional state.

:param service_name: The name of the service to check the status for.
:return: True if the service is in a transitional state, False otherwise.
"""
service_state = self._get_current_state(service_name)
self.log.info(f"Current service state for {service_name} is: {service_state}")
return service_state in (
Expand Down
Loading
Loading