diff --git a/examples/metajob-example/README.md b/examples/dag-example/README.md similarity index 84% rename from examples/metajob-example/README.md rename to examples/dag-example/README.md index 0e4c03357a..6916bc3888 100644 --- a/examples/metajob-example/README.md +++ b/examples/dag-example/README.md @@ -1,18 +1,18 @@ Overview -------- -Meta Jobs allow VDK users to schedule jobs in a directed acyclic graph. +DAGs allow VDK users to schedule jobs in a directed acyclic graph. This means that jobs can be configured to run only when a set of previous jobs have finished successfully. In this example we will use the Versatile Data Kit to develop four Data jobs - two of these jobs will read data from separate json files, and will subsequently insert the data into Trino tables. The third job will read the data -inserted by the previous two jobs, and will print the data to the terminal. The fourth Data Job will be a Meta Job +inserted by the previous two jobs, and will print the data to the terminal. The fourth Data Job will be a VDK DAG which will manage the other three and ensure that the third job runs only when the previous two finish successfully. -The Meta Job uses a separate job input object separate from the one usually used for job +The DAG uses a separate job input object separate from the one usually used for job operations in VDK Data Jobs and must be imported. -The graph for our Meta Job will look like this: -![Meta Job graph](images/metajob-graph.png) +The graph for our DAG will look like this: +![DAG graph](images/dag-graph.png) Before you continue, make sure you are familiar with the [Getting Started](https://github.com/vmware/versatile-data-kit/wiki/Getting-Started) section of the wiki. @@ -23,9 +23,9 @@ Code The relevant Data Job code is available [here](https://github.com/vmware/versatile-data-kit/tree/main/examples). -You can follow along and run this Meta Job on your machine; +You can follow along and run this DAG on your machine; alternatively, you can use the available code as a template and extend it to -make a Meta Job that fits your use case more closely. +make a DAG that fits your use case more closely. Data -------- @@ -40,7 +40,7 @@ To run this example, you need * Versatile Data Kit * Trino DB * `vdk-trino` - VDK plugin for a connection to a Trino database -* `vdk-meta-jobs` - VDK plugin for Meta Job functionality +* `vdk-dag` - VDK plugin for DAG functionality Configuration ------------- @@ -48,7 +48,7 @@ If you have not done so already, you can install Versatile Data Kit and the plugins required for this example by running the following commands from a terminal: ```console pip install quickstart-vdk -pip install vdk-meta-jobs +pip install vdk-dag ``` Note that Versatile Data Kit requires Python 3.7+. See the [Installation page](https://github.com/vmware/versatile-data-kit/wiki/Installation#install-sdk) for more details. @@ -75,7 +75,7 @@ vdk server --install Data Jobs -------- -Our three Datajobs have the following structure: +Our three Data jobs have the following structure: ``` ingest-job1/ @@ -90,7 +90,7 @@ ingest-job1/ 01_drop_table.sql ```sql -drop table if exists memory.default.test_metajob_one +drop table if exists memory.default.test_dag_one ``` @@ -114,12 +114,12 @@ def run(job_input: IJobInput): rows = [tuple(i.values()) for i in data] insert_query = """ - INSERT INTO memory.default.test_metajob_one VALUES + INSERT INTO memory.default.test_dag_one VALUES """ + ", ".join(str(i) for i in rows) job_input.execute_query( """ - CREATE TABLE IF NOT EXISTS memory.default.test_metajob_one + CREATE TABLE IF NOT EXISTS memory.default.test_dag_one ( id varchar, first_name varchar, @@ -187,7 +187,7 @@ ingest-job2/ 01_drop_table.sql ```sql -drop table if exists memory.default.test_metajob_two +drop table if exists memory.default.test_dag_two ``` @@ -211,12 +211,12 @@ def run(job_input: IJobInput): rows = [tuple(i.values()) for i in data] insert_query = """ - INSERT INTO memory.default.test_metajob_two VALUES + INSERT INTO memory.default.test_dag_two VALUES """ + ", ".join(str(i) for i in rows) job_input.execute_query( """ - CREATE TABLE IF NOT EXISTS memory.default.test_metajob_two + CREATE TABLE IF NOT EXISTS memory.default.test_dag_two ( id integer, first_name varchar, @@ -289,10 +289,10 @@ from vdk.api.job_input import IJobInput def run(job_input: IJobInput): job1_data = job_input.execute_query( - "SELECT * FROM memory.default.test_metajob_one" + "SELECT * FROM memory.default.test_dag_one" ) job2_data = job_input.execute_query( - "SELECT * FROM memory.default.test_metajob_two" + "SELECT * FROM memory.default.test_dag_two" ) print( @@ -305,7 +305,7 @@ def run(job_input: IJobInput): 20_drop_table_one.sql ```sql -drop table if exists memory.default.test_metajob_one +drop table if exists memory.default.test_dag_one ``` @@ -313,7 +313,7 @@ drop table if exists memory.default.test_metajob_one 30_drop_table_two.sql ```sql -drop table if exists memory.default.test_metajob_two +drop table if exists memory.default.test_dag_two ``` @@ -347,48 +347,48 @@ vdk-trino ``` -example-meta-job/ -├── example_meta_job.py +example-dag/ +├── example_dag.py ├── config.ini ├── requirements.txt ```
- example_meta_job.py + example_dag.py ```python -from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput +from vdk.plugin.dag.dag_runner import DagInput JOBS_RUN_ORDER = [ { "job_name": "ingest-job1", "team_name": "my-team", - "fail_meta_job_on_error": True, + "fail_dag_on_error": True, "depends_on": [] }, { "job_name": "ingest-job2", "team_name": "my-team", - "fail_meta_job_on_error": True, + "fail_dag_on_error": True, "depends_on": [] }, { "job_name": "read-data-job", "team_name": "my-team", - "fail_meta_job_on_error": True, + "fail_dag_on_error": True, "depends_on": ["ingest-job1", "ingest-job2"] }, ] def run(job_input): - MetaJobInput().run_meta_job(JOBS_RUN_ORDER) + DagInput().run_dag(JOBS_RUN_ORDER) ```
-Note that the `run_meta_job` method belongs to the `MetaJobInput` object which must be imported +Note that the `run_dag` method belongs to the `DagInput` object which must be imported and instantiated separately from the default `IJobInput` object which is passed to the `run` function by default.
@@ -412,12 +412,12 @@ team = my-team requirements.txt ```text -vdk-meta-jobs +vdk-dag ```
-Note that the VDK Meta Job does not require the `vdk-trino` dependency. -Component jobs are responsible for their own dependencies, and the Meta Job only handles their triggering. +Note that the VDK DAG does not require the `vdk-trino` dependency. +Component jobs are responsible for their own dependencies, and the DAG only handles their triggering. Execution --------- @@ -430,30 +430,30 @@ folders that you have created, and type the following commands one by one: ```console vdk create -n ingest-job1 -t my-team --no-template -u http://localhost:8092 && \ -vdk deploy -n ingest-job1 -t my-team -p ingest-job1 -r "metajob-example" -u http://localhost:8092 +vdk deploy -n ingest-job1 -t my-team -p ingest-job1 -r "dag-example" -u http://localhost:8092 ``` ```console vdk create -n ingest-job2 -t my-team --no-template -u http://localhost:8092 && \ -vdk deploy -n ingest-job2 -t my-team -p ingest-job2 -r "metajob-example" -u http://localhost:8092 +vdk deploy -n ingest-job2 -t my-team -p ingest-job2 -r "dag-example" -u http://localhost:8092 ``` ```console vdk create -n read-job -t my-team --no-template -u http://localhost:8092 && \ -vdk deploy -n read-job -t my-team -p read-job -r "metajob-example" -u http://localhost:8092 +vdk deploy -n read-job -t my-team -p read-job -r "dag-example" -u http://localhost:8092 ``` ```console -vdk create -n example-meta-job -t my-team --no-template -u http://localhost:8092 && \ -vdk deploy -n example-meta-job -t my-team -p example-meta-job -r "metajob-example" -u http://localhost:8092 +vdk create -n example-dag -t my-team --no-template -u http://localhost:8092 && \ +vdk deploy -n example-dag -t my-team -p example-dag -r "dag-example" -u http://localhost:8092 ``` -You can now run your Meta Job through the Execution API by using the following command: +You can now run your DAG through the Execution API by using the following command: ```console -vdk execute --start -n example-meta-job -t my-team -u http://localhost:8092 +vdk execute --start -n example-dag -t my-team -u http://localhost:8092 ``` -Alternatively, if you would like your Meta Job to run on a set schedule, you can configure +Alternatively, if you would like your DAG to run on a set schedule, you can configure its cron schedule in its config.ini file as you would with any other Data Job. diff --git a/examples/metajob-example/example-meta-job/config.ini b/examples/dag-example/example-dag/config.ini similarity index 100% rename from examples/metajob-example/example-meta-job/config.ini rename to examples/dag-example/example-dag/config.ini diff --git a/examples/metajob-example/example-meta-job/example_meta_job.py b/examples/dag-example/example-dag/example_dag.py similarity index 66% rename from examples/metajob-example/example-meta-job/example_meta_job.py rename to examples/dag-example/example-dag/example_dag.py index 8d61ec2023..042eda7fe7 100644 --- a/examples/metajob-example/example-meta-job/example_meta_job.py +++ b/examples/dag-example/example-dag/example_dag.py @@ -1,29 +1,29 @@ # Copyright 2021-2023 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 -from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput +from vdk.plugin.dag.dag_runner import DagInput JOBS_RUN_ORDER = [ { "job_name": "ingest-job1", "team_name": "my-team", - "fail_meta_job_on_error": True, + "fail_dag_on_error": True, "depends_on": [], }, { "job_name": "ingest-job2", "team_name": "my-team", - "fail_meta_job_on_error": True, + "fail_dag_on_error": True, "depends_on": [], }, { "job_name": "read-data-job", "team_name": "my-team", - "fail_meta_job_on_error": True, + "fail_dag_on_error": True, "depends_on": ["ingest-job1", "ingest-job2"], }, ] def run(job_input): - MetaJobInput().run_meta_job(JOBS_RUN_ORDER) + DagInput().run_dag(JOBS_RUN_ORDER) diff --git a/examples/dag-example/example-dag/requirements.txt b/examples/dag-example/example-dag/requirements.txt new file mode 100644 index 0000000000..e4f62cfaca --- /dev/null +++ b/examples/dag-example/example-dag/requirements.txt @@ -0,0 +1 @@ +vdk-dag diff --git a/examples/metajob-example/images/metajob-graph.png b/examples/dag-example/images/dag-graph.png similarity index 100% rename from examples/metajob-example/images/metajob-graph.png rename to examples/dag-example/images/dag-graph.png diff --git a/examples/dag-example/ingest-job1/01_drop_table.sql b/examples/dag-example/ingest-job1/01_drop_table.sql new file mode 100644 index 0000000000..39fe6b08e9 --- /dev/null +++ b/examples/dag-example/ingest-job1/01_drop_table.sql @@ -0,0 +1 @@ +drop table if exists memory.default.test_dag_one diff --git a/examples/metajob-example/ingest-job1/10_insert_data.py b/examples/dag-example/ingest-job1/10_insert_data.py similarity index 88% rename from examples/metajob-example/ingest-job1/10_insert_data.py rename to examples/dag-example/ingest-job1/10_insert_data.py index 53808b988c..1ee0fc35e5 100644 --- a/examples/metajob-example/ingest-job1/10_insert_data.py +++ b/examples/dag-example/ingest-job1/10_insert_data.py @@ -16,14 +16,14 @@ def run(job_input: IJobInput): rows = [tuple(i.values()) for i in data] insert_query = """ - INSERT INTO memory.default.test_metajob_one VALUES + INSERT INTO memory.default.test_dag_one VALUES """ + ", ".join( str(i) for i in rows ) job_input.execute_query( """ - CREATE TABLE IF NOT EXISTS memory.default.test_metajob_one + CREATE TABLE IF NOT EXISTS memory.default.test_dag_one ( id varchar, first_name varchar, diff --git a/examples/metajob-example/ingest-job1/config.ini b/examples/dag-example/ingest-job1/config.ini similarity index 100% rename from examples/metajob-example/ingest-job1/config.ini rename to examples/dag-example/ingest-job1/config.ini diff --git a/examples/metajob-example/ingest-job1/data.json b/examples/dag-example/ingest-job1/data.json similarity index 100% rename from examples/metajob-example/ingest-job1/data.json rename to examples/dag-example/ingest-job1/data.json diff --git a/examples/metajob-example/ingest-job1/requirements.txt b/examples/dag-example/ingest-job1/requirements.txt similarity index 100% rename from examples/metajob-example/ingest-job1/requirements.txt rename to examples/dag-example/ingest-job1/requirements.txt diff --git a/examples/dag-example/ingest-job2/01_drop_table.sql b/examples/dag-example/ingest-job2/01_drop_table.sql new file mode 100644 index 0000000000..71bb65b0e5 --- /dev/null +++ b/examples/dag-example/ingest-job2/01_drop_table.sql @@ -0,0 +1 @@ +drop table if exists memory.default.test_dag_two diff --git a/examples/metajob-example/ingest-job2/10_insert_data.py b/examples/dag-example/ingest-job2/10_insert_data.py similarity index 88% rename from examples/metajob-example/ingest-job2/10_insert_data.py rename to examples/dag-example/ingest-job2/10_insert_data.py index 8236ad52ab..75f2bb4107 100644 --- a/examples/metajob-example/ingest-job2/10_insert_data.py +++ b/examples/dag-example/ingest-job2/10_insert_data.py @@ -16,14 +16,14 @@ def run(job_input: IJobInput): rows = [tuple(i.values()) for i in data] insert_query = """ - INSERT INTO memory.default.test_metajob_two VALUES + INSERT INTO memory.default.test_dag_two VALUES """ + ", ".join( str(i) for i in rows ) job_input.execute_query( """ - CREATE TABLE IF NOT EXISTS memory.default.test_metajob_two + CREATE TABLE IF NOT EXISTS memory.default.test_dag_two ( id integer, first_name varchar, diff --git a/examples/metajob-example/ingest-job2/config.ini b/examples/dag-example/ingest-job2/config.ini similarity index 100% rename from examples/metajob-example/ingest-job2/config.ini rename to examples/dag-example/ingest-job2/config.ini diff --git a/examples/metajob-example/ingest-job2/data.json b/examples/dag-example/ingest-job2/data.json similarity index 100% rename from examples/metajob-example/ingest-job2/data.json rename to examples/dag-example/ingest-job2/data.json diff --git a/examples/metajob-example/ingest-job2/requirements.txt b/examples/dag-example/ingest-job2/requirements.txt similarity index 100% rename from examples/metajob-example/ingest-job2/requirements.txt rename to examples/dag-example/ingest-job2/requirements.txt diff --git a/examples/metajob-example/read-data-job/10_read.py b/examples/dag-example/read-data-job/10_read.py similarity index 87% rename from examples/metajob-example/read-data-job/10_read.py rename to examples/dag-example/read-data-job/10_read.py index 55b9568179..2ee386a927 100644 --- a/examples/metajob-example/read-data-job/10_read.py +++ b/examples/dag-example/read-data-job/10_read.py @@ -4,7 +4,7 @@ def run(job_input: IJobInput): - job1_data = job_input.execute_query("SELECT * FROM memory.default.test_metajob_one") - job2_data = job_input.execute_query("SELECT * FROM memory.default.test_metajob_two") + job1_data = job_input.execute_query("SELECT * FROM memory.default.test_dag_one") + job2_data = job_input.execute_query("SELECT * FROM memory.default.test_dag_two") print(f"Job 1 Data ===> {job1_data} \n\n\n Job 2 Data ===> {job2_data}") diff --git a/examples/dag-example/read-data-job/20_drop_table_one.sql b/examples/dag-example/read-data-job/20_drop_table_one.sql new file mode 100644 index 0000000000..39fe6b08e9 --- /dev/null +++ b/examples/dag-example/read-data-job/20_drop_table_one.sql @@ -0,0 +1 @@ +drop table if exists memory.default.test_dag_one diff --git a/examples/dag-example/read-data-job/30_drop_table_two.sql b/examples/dag-example/read-data-job/30_drop_table_two.sql new file mode 100644 index 0000000000..71bb65b0e5 --- /dev/null +++ b/examples/dag-example/read-data-job/30_drop_table_two.sql @@ -0,0 +1 @@ +drop table if exists memory.default.test_dag_two diff --git a/examples/metajob-example/read-data-job/config.ini b/examples/dag-example/read-data-job/config.ini similarity index 100% rename from examples/metajob-example/read-data-job/config.ini rename to examples/dag-example/read-data-job/config.ini diff --git a/examples/metajob-example/read-data-job/requirements.txt b/examples/dag-example/read-data-job/requirements.txt similarity index 100% rename from examples/metajob-example/read-data-job/requirements.txt rename to examples/dag-example/read-data-job/requirements.txt diff --git a/examples/dag-with-args-example/README.md b/examples/dag-with-args-example/README.md index 60c4ade39a..08b3646ade 100644 --- a/examples/dag-with-args-example/README.md +++ b/examples/dag-with-args-example/README.md @@ -568,7 +568,7 @@ def run(job_input) -> None: ``` -Note that the `run_dag` method belongs to the `DAGInput` object which must be imported +Note that the `run_dag` method belongs to the `DagInput` object which must be imported and instantiated separately from the default `IJobInput` object which is passed to the `run` function by default.
@@ -588,8 +588,8 @@ team = my-team [vdk] dags_max_concurrent_running_jobs = 2 -dags_delayed_jobs_min_delay_seconds = 1 dags_delayed_jobs_randomized_added_delay_seconds = 1 +dags_delayed_jobs_min_delay_seconds = 1 ```
@@ -622,6 +622,18 @@ The other two configurations are set in order to have a short fixed delay for de Check the [configuration](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin_configuration.py) for more details. +
+ requirements.txt + +```text +vdk-dag +``` +
+ +Note that the VDK DAG Job does not require the `vdk-trino` dependency. +Component jobs are responsible for their own dependencies, and the DAG Job only handles their triggering. + + ## Execution [Here](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-dag#high-level-design) you can read diff --git a/examples/dag-with-args-example/dag-job/config.ini b/examples/dag-with-args-example/dag-job/config.ini index cdf8865fbe..d8aa8462e3 100644 --- a/examples/dag-with-args-example/dag-job/config.ini +++ b/examples/dag-with-args-example/dag-job/config.ini @@ -10,6 +10,6 @@ team = my-team [vdk] -meta_jobs_max_concurrent_running_jobs = 2 -meta_jobs_delayed_jobs_min_delay_seconds = 1 -meta_jobs_delayed_jobs_randomized_added_delay_seconds = 1 +dags_max_concurrent_running_jobs = 2 +dags_delayed_jobs_min_delay_seconds = 1 +dags_delayed_jobs_randomized_added_delay_seconds = 1 diff --git a/examples/metajob-example/example-meta-job/requirements.txt b/examples/metajob-example/example-meta-job/requirements.txt deleted file mode 100644 index 3ee5dcc5ac..0000000000 --- a/examples/metajob-example/example-meta-job/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -vdk-meta-jobs diff --git a/examples/metajob-example/ingest-job1/01_drop_table.sql b/examples/metajob-example/ingest-job1/01_drop_table.sql deleted file mode 100644 index d084fbd447..0000000000 --- a/examples/metajob-example/ingest-job1/01_drop_table.sql +++ /dev/null @@ -1 +0,0 @@ -drop table if exists memory.default.test_metajob_one diff --git a/examples/metajob-example/ingest-job2/01_drop_table.sql b/examples/metajob-example/ingest-job2/01_drop_table.sql deleted file mode 100644 index 88519273b3..0000000000 --- a/examples/metajob-example/ingest-job2/01_drop_table.sql +++ /dev/null @@ -1 +0,0 @@ -drop table if exists memory.default.test_metajob_two diff --git a/examples/metajob-example/read-data-job/20_drop_table_one.sql b/examples/metajob-example/read-data-job/20_drop_table_one.sql deleted file mode 100644 index d084fbd447..0000000000 --- a/examples/metajob-example/read-data-job/20_drop_table_one.sql +++ /dev/null @@ -1 +0,0 @@ -drop table if exists memory.default.test_metajob_one diff --git a/examples/metajob-example/read-data-job/30_drop_table_two.sql b/examples/metajob-example/read-data-job/30_drop_table_two.sql deleted file mode 100644 index 88519273b3..0000000000 --- a/examples/metajob-example/read-data-job/30_drop_table_two.sql +++ /dev/null @@ -1 +0,0 @@ -drop table if exists memory.default.test_metajob_two