Skip to content

Commit

Permalink
vdk-examples: Change Meta Jobs to DAGs in examples (#2024)
Browse files Browse the repository at this point in the history
This PR corrects two examples to bring them in line with the changed
name of VDK DAGs.

---------

Signed-off-by: Gabriel Georgiev <[email protected]>
  • Loading branch information
gabrielgeorgiev1 authored May 10, 2023
1 parent 9145920 commit 7fb3a5d
Show file tree
Hide file tree
Showing 27 changed files with 73 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
Overview
--------
Meta Jobs allow VDK users to schedule jobs in a directed acyclic graph.
DAGs allow VDK users to schedule jobs in a directed acyclic graph.
This means that jobs can be configured to run only when a set of previous jobs have finished successfully.

In this example we will use the Versatile Data Kit to develop four Data jobs - two of these jobs will read data
from separate json files, and will subsequently insert the data into Trino tables. The third job will read the data
inserted by the previous two jobs, and will print the data to the terminal. The fourth Data Job will be a Meta Job
inserted by the previous two jobs, and will print the data to the terminal. The fourth Data Job will be a VDK DAG
which will manage the other three and ensure that the third job runs only when the previous two finish successfully.

The Meta Job uses a separate job input object separate from the one usually used for job
The DAG uses a separate job input object separate from the one usually used for job
operations in VDK Data Jobs and must be imported.

The graph for our Meta Job will look like this:
![Meta Job graph](images/metajob-graph.png)
The graph for our DAG will look like this:
![DAG graph](images/dag-graph.png)

Before you continue, make sure you are familiar with the
[Getting Started](https://github.com/vmware/versatile-data-kit/wiki/Getting-Started) section of the wiki.
Expand All @@ -23,9 +23,9 @@ Code
The relevant Data Job code is available
[here](https://github.com/vmware/versatile-data-kit/tree/main/examples).

You can follow along and run this Meta Job on your machine;
You can follow along and run this DAG on your machine;
alternatively, you can use the available code as a template and extend it to
make a Meta Job that fits your use case more closely.
make a DAG that fits your use case more closely.

Data
--------
Expand All @@ -40,15 +40,15 @@ To run this example, you need
* Versatile Data Kit
* Trino DB
* `vdk-trino` - VDK plugin for a connection to a Trino database
* `vdk-meta-jobs` - VDK plugin for Meta Job functionality
* `vdk-dag` - VDK plugin for DAG functionality

Configuration
-------------
If you have not done so already, you can install Versatile Data Kit and the
plugins required for this example by running the following commands from a terminal:
```console
pip install quickstart-vdk
pip install vdk-meta-jobs
pip install vdk-dag
```
Note that Versatile Data Kit requires Python 3.7+. See the
[Installation page](https://github.com/vmware/versatile-data-kit/wiki/Installation#install-sdk) for more details.
Expand All @@ -75,7 +75,7 @@ vdk server --install
Data Jobs
--------

Our three Datajobs have the following structure:
Our three Data jobs have the following structure:

```
ingest-job1/
Expand All @@ -90,7 +90,7 @@ ingest-job1/
<summary>01_drop_table.sql</summary>

```sql
drop table if exists memory.default.test_metajob_one
drop table if exists memory.default.test_dag_one
```
</details>

Expand All @@ -114,12 +114,12 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_metajob_one VALUES
INSERT INTO memory.default.test_dag_one VALUES
""" + ", ".join(str(i) for i in rows)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_metajob_one
CREATE TABLE IF NOT EXISTS memory.default.test_dag_one
(
id varchar,
first_name varchar,
Expand Down Expand Up @@ -187,7 +187,7 @@ ingest-job2/
<summary>01_drop_table.sql</summary>

```sql
drop table if exists memory.default.test_metajob_two
drop table if exists memory.default.test_dag_two
```
</details>

Expand All @@ -211,12 +211,12 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_metajob_two VALUES
INSERT INTO memory.default.test_dag_two VALUES
""" + ", ".join(str(i) for i in rows)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_metajob_two
CREATE TABLE IF NOT EXISTS memory.default.test_dag_two
(
id integer,
first_name varchar,
Expand Down Expand Up @@ -289,10 +289,10 @@ from vdk.api.job_input import IJobInput

def run(job_input: IJobInput):
job1_data = job_input.execute_query(
"SELECT * FROM memory.default.test_metajob_one"
"SELECT * FROM memory.default.test_dag_one"
)
job2_data = job_input.execute_query(
"SELECT * FROM memory.default.test_metajob_two"
"SELECT * FROM memory.default.test_dag_two"
)

print(
Expand All @@ -305,15 +305,15 @@ def run(job_input: IJobInput):
<summary>20_drop_table_one.sql</summary>

```sql
drop table if exists memory.default.test_metajob_one
drop table if exists memory.default.test_dag_one
```
</details>

<details>
<summary>30_drop_table_two.sql</summary>

```sql
drop table if exists memory.default.test_metajob_two
drop table if exists memory.default.test_dag_two
```
</details>

Expand Down Expand Up @@ -347,48 +347,48 @@ vdk-trino
</details>

```
example-meta-job/
├── example_meta_job.py
example-dag/
├── example_dag.py
├── config.ini
├── requirements.txt
```

<details>
<summary>example_meta_job.py</summary>
<summary>example_dag.py</summary>

```python
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput
from vdk.plugin.dag.dag_runner import DagInput


JOBS_RUN_ORDER = [
{
"job_name": "ingest-job1",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": []
},

{
"job_name": "ingest-job2",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": []
},
{
"job_name": "read-data-job",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": ["ingest-job1", "ingest-job2"]
},
]

def run(job_input):
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
DagInput().run_dag(JOBS_RUN_ORDER)

```
</details>

Note that the `run_meta_job` method belongs to the `MetaJobInput` object which must be imported
Note that the `run_dag` method belongs to the `DagInput` object which must be imported
and instantiated separately from the default `IJobInput` object which is passed to the `run` function by default.

<details>
Expand All @@ -412,12 +412,12 @@ team = my-team
<summary>requirements.txt</summary>

```text
vdk-meta-jobs
vdk-dag
```
</details>

Note that the VDK Meta Job does not require the `vdk-trino` dependency.
Component jobs are responsible for their own dependencies, and the Meta Job only handles their triggering.
Note that the VDK DAG does not require the `vdk-trino` dependency.
Component jobs are responsible for their own dependencies, and the DAG only handles their triggering.

Execution
---------
Expand All @@ -430,30 +430,30 @@ folders that you have created, and type the following commands one by one:

```console
vdk create -n ingest-job1 -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n ingest-job1 -t my-team -p ingest-job1 -r "metajob-example" -u http://localhost:8092
vdk deploy -n ingest-job1 -t my-team -p ingest-job1 -r "dag-example" -u http://localhost:8092
```

```console
vdk create -n ingest-job2 -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n ingest-job2 -t my-team -p ingest-job2 -r "metajob-example" -u http://localhost:8092
vdk deploy -n ingest-job2 -t my-team -p ingest-job2 -r "dag-example" -u http://localhost:8092
```

```console
vdk create -n read-job -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n read-job -t my-team -p read-job -r "metajob-example" -u http://localhost:8092
vdk deploy -n read-job -t my-team -p read-job -r "dag-example" -u http://localhost:8092
```

```console
vdk create -n example-meta-job -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n example-meta-job -t my-team -p example-meta-job -r "metajob-example" -u http://localhost:8092
vdk create -n example-dag -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n example-dag -t my-team -p example-dag -r "dag-example" -u http://localhost:8092
```

You can now run your Meta Job through the Execution API by using the following command:
You can now run your DAG through the Execution API by using the following command:
```console
vdk execute --start -n example-meta-job -t my-team -u http://localhost:8092
vdk execute --start -n example-dag -t my-team -u http://localhost:8092
```

Alternatively, if you would like your Meta Job to run on a set schedule, you can configure
Alternatively, if you would like your DAG to run on a set schedule, you can configure
its cron schedule in its config.ini file as you would with any other Data Job.


Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput
from vdk.plugin.dag.dag_runner import DagInput


JOBS_RUN_ORDER = [
{
"job_name": "ingest-job1",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": [],
},
{
"job_name": "ingest-job2",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": [],
},
{
"job_name": "read-data-job",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": ["ingest-job1", "ingest-job2"],
},
]


def run(job_input):
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
DagInput().run_dag(JOBS_RUN_ORDER)
1 change: 1 addition & 0 deletions examples/dag-example/example-dag/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-dag
File renamed without changes
1 change: 1 addition & 0 deletions examples/dag-example/ingest-job1/01_drop_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_dag_one
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_metajob_one VALUES
INSERT INTO memory.default.test_dag_one VALUES
""" + ", ".join(
str(i) for i in rows
)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_metajob_one
CREATE TABLE IF NOT EXISTS memory.default.test_dag_one
(
id varchar,
first_name varchar,
Expand Down
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions examples/dag-example/ingest-job2/01_drop_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_dag_two
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_metajob_two VALUES
INSERT INTO memory.default.test_dag_two VALUES
""" + ", ".join(
str(i) for i in rows
)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_metajob_two
CREATE TABLE IF NOT EXISTS memory.default.test_dag_two
(
id integer,
first_name varchar,
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


def run(job_input: IJobInput):
job1_data = job_input.execute_query("SELECT * FROM memory.default.test_metajob_one")
job2_data = job_input.execute_query("SELECT * FROM memory.default.test_metajob_two")
job1_data = job_input.execute_query("SELECT * FROM memory.default.test_dag_one")
job2_data = job_input.execute_query("SELECT * FROM memory.default.test_dag_two")

print(f"Job 1 Data ===> {job1_data} \n\n\n Job 2 Data ===> {job2_data}")
1 change: 1 addition & 0 deletions examples/dag-example/read-data-job/20_drop_table_one.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_dag_one
1 change: 1 addition & 0 deletions examples/dag-example/read-data-job/30_drop_table_two.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_dag_two
16 changes: 14 additions & 2 deletions examples/dag-with-args-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ def run(job_input) -> None:
```
</details>

Note that the `run_dag` method belongs to the `DAGInput` object which must be imported
Note that the `run_dag` method belongs to the `DagInput` object which must be imported
and instantiated separately from the default `IJobInput` object which is passed to the `run` function by default.

<details>
Expand All @@ -588,8 +588,8 @@ team = my-team

[vdk]
dags_max_concurrent_running_jobs = 2
dags_delayed_jobs_min_delay_seconds = 1
dags_delayed_jobs_randomized_added_delay_seconds = 1
dags_delayed_jobs_min_delay_seconds = 1
```
</details>

Expand Down Expand Up @@ -622,6 +622,18 @@ The other two configurations are set in order to have a short fixed delay for de
Check the [configuration](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin_configuration.py)
for more details.

<details>
<summary>requirements.txt</summary>

```text
vdk-dag
```
</details>

Note that the VDK DAG Job does not require the `vdk-trino` dependency.
Component jobs are responsible for their own dependencies, and the DAG Job only handles their triggering.


## Execution

[Here](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-dag#high-level-design) you can read
Expand Down
6 changes: 3 additions & 3 deletions examples/dag-with-args-example/dag-job/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
team = my-team

[vdk]
meta_jobs_max_concurrent_running_jobs = 2
meta_jobs_delayed_jobs_min_delay_seconds = 1
meta_jobs_delayed_jobs_randomized_added_delay_seconds = 1
dags_max_concurrent_running_jobs = 2
dags_delayed_jobs_min_delay_seconds = 1
dags_delayed_jobs_randomized_added_delay_seconds = 1
1 change: 0 additions & 1 deletion examples/metajob-example/example-meta-job/requirements.txt

This file was deleted.

1 change: 0 additions & 1 deletion examples/metajob-example/ingest-job1/01_drop_table.sql

This file was deleted.

1 change: 0 additions & 1 deletion examples/metajob-example/ingest-job2/01_drop_table.sql

This file was deleted.

This file was deleted.

Loading

0 comments on commit 7fb3a5d

Please sign in to comment.