-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathanyscale_job.py
49 lines (40 loc) · 1.25 KB
/
anyscale_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
from anyscale_provider.operators.anyscale import SubmitAnyscaleJob
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 4, 2),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"
# Constants
FOLDER_PATH = Path(__file__).parent / "ray_scripts"
dag = DAG(
"sample_anyscale_job_workflow",
default_args=default_args,
description="A DAG to interact with Anyscale triggered manually",
schedule=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)
submit_anyscale_job = SubmitAnyscaleJob(
task_id="submit_anyscale_job",
conn_id=ANYSCALE_CONN_ID,
name="AstroJob",
image_uri="anyscale/image/airflow-integration-testing:1",
compute_config="airflow-integration-testing:1",
working_dir=str(FOLDER_PATH),
entrypoint="python ray-job.py",
requirements=["requests", "pandas", "numpy", "torch"],
max_retries=1,
job_timeout_seconds=3000,
poll_interval=30,
dag=dag,
)
# Defining the task sequence
submit_anyscale_job