Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-examples: Change Meta Jobs to DAGs in examples #2024

Merged
merged 4 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
Overview
--------
Meta Jobs allow VDK users to schedule jobs in a directed acyclic graph.
DAGs allow VDK users to schedule jobs in a directed acyclic graph.
This means that jobs can be configured to run only when a set of previous jobs have finished successfully.

In this example we will use the Versatile Data Kit to develop four Data jobs - two of these jobs will read data
from separate json files, and will subsequently insert the data into Trino tables. The third job will read the data
inserted by the previous two jobs, and will print the data to the terminal. The fourth Data Job will be a Meta Job
inserted by the previous two jobs, and will print the data to the terminal. The fourth Data Job will be a VDK DAG
which will manage the other three and ensure that the third job runs only when the previous two finish successfully.

The Meta Job uses a separate job input object separate from the one usually used for job
The DAG uses a separate job input object separate from the one usually used for job
operations in VDK Data Jobs and must be imported.

The graph for our Meta Job will look like this:
![Meta Job graph](images/metajob-graph.png)
The graph for our DAG will look like this:
![DAG graph](images/dag-graph.png)

Before you continue, make sure you are familiar with the
[Getting Started](https://github.com/vmware/versatile-data-kit/wiki/Getting-Started) section of the wiki.
Expand All @@ -23,9 +23,9 @@ Code
The relevant Data Job code is available
[here](https://github.com/vmware/versatile-data-kit/tree/main/examples).

You can follow along and run this Meta Job on your machine;
You can follow along and run this DAG on your machine;
alternatively, you can use the available code as a template and extend it to
make a Meta Job that fits your use case more closely.
make a DAG that fits your use case more closely.

Data
--------
Expand All @@ -40,15 +40,15 @@ To run this example, you need
* Versatile Data Kit
* Trino DB
* `vdk-trino` - VDK plugin for a connection to a Trino database
* `vdk-meta-jobs` - VDK plugin for Meta Job functionality
* `vdk-dag` - VDK plugin for DAG functionality

Configuration
-------------
If you have not done so already, you can install Versatile Data Kit and the
plugins required for this example by running the following commands from a terminal:
```console
pip install quickstart-vdk
pip install vdk-meta-jobs
pip install vdk-dag
```
Note that Versatile Data Kit requires Python 3.7+. See the
[Installation page](https://github.com/vmware/versatile-data-kit/wiki/Installation#install-sdk) for more details.
Expand All @@ -75,7 +75,7 @@ vdk server --install
Data Jobs
--------

Our three Datajobs have the following structure:
Our three Data jobs have the following structure:

```
ingest-job1/
Expand All @@ -90,7 +90,7 @@ ingest-job1/
<summary>01_drop_table.sql</summary>

```sql
drop table if exists memory.default.test_metajob_one
drop table if exists memory.default.test_dag_one
```
</details>

Expand All @@ -114,12 +114,12 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_metajob_one VALUES
INSERT INTO memory.default.test_dag_one VALUES
""" + ", ".join(str(i) for i in rows)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_metajob_one
CREATE TABLE IF NOT EXISTS memory.default.test_dag_one
(
id varchar,
first_name varchar,
Expand Down Expand Up @@ -187,7 +187,7 @@ ingest-job2/
<summary>01_drop_table.sql</summary>

```sql
drop table if exists memory.default.test_metajob_two
drop table if exists memory.default.test_dag_two
```
</details>

Expand All @@ -211,12 +211,12 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_metajob_two VALUES
INSERT INTO memory.default.test_dag_two VALUES
""" + ", ".join(str(i) for i in rows)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_metajob_two
CREATE TABLE IF NOT EXISTS memory.default.test_dag_two
(
id integer,
first_name varchar,
Expand Down Expand Up @@ -289,10 +289,10 @@ from vdk.api.job_input import IJobInput

def run(job_input: IJobInput):
job1_data = job_input.execute_query(
"SELECT * FROM memory.default.test_metajob_one"
"SELECT * FROM memory.default.test_dag_one"
)
job2_data = job_input.execute_query(
"SELECT * FROM memory.default.test_metajob_two"
"SELECT * FROM memory.default.test_dag_two"
)

print(
Expand All @@ -305,15 +305,15 @@ def run(job_input: IJobInput):
<summary>20_drop_table_one.sql</summary>

```sql
drop table if exists memory.default.test_metajob_one
drop table if exists memory.default.test_dag_one
```
</details>

<details>
<summary>30_drop_table_two.sql</summary>

```sql
drop table if exists memory.default.test_metajob_two
drop table if exists memory.default.test_dag_two
```
</details>

Expand Down Expand Up @@ -347,48 +347,48 @@ vdk-trino
</details>

```
example-meta-job/
├── example_meta_job.py
example-dag/
├── example_dag.py
├── config.ini
├── requirements.txt
```

<details>
<summary>example_meta_job.py</summary>
<summary>example_dag.py</summary>

```python
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput
from vdk.plugin.dag.dag_runner import DagInput


JOBS_RUN_ORDER = [
{
"job_name": "ingest-job1",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": []
},

{
"job_name": "ingest-job2",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": []
},
{
"job_name": "read-data-job",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": ["ingest-job1", "ingest-job2"]
},
]

def run(job_input):
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
DagInput().run_dag(JOBS_RUN_ORDER)

```
</details>

Note that the `run_meta_job` method belongs to the `MetaJobInput` object which must be imported
Note that the `run_dag` method belongs to the `DagInput` object which must be imported
and instantiated separately from the default `IJobInput` object which is passed to the `run` function by default.

<details>
Expand All @@ -412,12 +412,12 @@ team = my-team
<summary>requirements.txt</summary>

```text
vdk-meta-jobs
vdk-dag
```
</details>

Note that the VDK Meta Job does not require the `vdk-trino` dependency.
Component jobs are responsible for their own dependencies, and the Meta Job only handles their triggering.
Note that the VDK DAG does not require the `vdk-trino` dependency.
Component jobs are responsible for their own dependencies, and the DAG only handles their triggering.

Execution
---------
Expand All @@ -430,30 +430,30 @@ folders that you have created, and type the following commands one by one:

```console
vdk create -n ingest-job1 -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n ingest-job1 -t my-team -p ingest-job1 -r "metajob-example" -u http://localhost:8092
vdk deploy -n ingest-job1 -t my-team -p ingest-job1 -r "dag-example" -u http://localhost:8092
```

```console
vdk create -n ingest-job2 -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n ingest-job2 -t my-team -p ingest-job2 -r "metajob-example" -u http://localhost:8092
vdk deploy -n ingest-job2 -t my-team -p ingest-job2 -r "dag-example" -u http://localhost:8092
```

```console
vdk create -n read-job -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n read-job -t my-team -p read-job -r "metajob-example" -u http://localhost:8092
vdk deploy -n read-job -t my-team -p read-job -r "dag-example" -u http://localhost:8092
```

```console
vdk create -n example-meta-job -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n example-meta-job -t my-team -p example-meta-job -r "metajob-example" -u http://localhost:8092
vdk create -n example-dag -t my-team --no-template -u http://localhost:8092 && \
vdk deploy -n example-dag -t my-team -p example-dag -r "dag-example" -u http://localhost:8092
```

You can now run your Meta Job through the Execution API by using the following command:
You can now run your DAG through the Execution API by using the following command:
```console
vdk execute --start -n example-meta-job -t my-team -u http://localhost:8092
vdk execute --start -n example-dag -t my-team -u http://localhost:8092
```

Alternatively, if you would like your Meta Job to run on a set schedule, you can configure
Alternatively, if you would like your DAG to run on a set schedule, you can configure
its cron schedule in its config.ini file as you would with any other Data Job.


Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput
from vdk.plugin.dag.dag_runner import DagInput


JOBS_RUN_ORDER = [
{
"job_name": "ingest-job1",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": [],
},
{
"job_name": "ingest-job2",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": [],
},
{
"job_name": "read-data-job",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"depends_on": ["ingest-job1", "ingest-job2"],
},
]


def run(job_input):
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
DagInput().run_dag(JOBS_RUN_ORDER)
1 change: 1 addition & 0 deletions examples/dag-example/example-dag/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-dag
1 change: 1 addition & 0 deletions examples/dag-example/ingest-job1/01_drop_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_dag_one
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_metajob_one VALUES
INSERT INTO memory.default.test_dag_one VALUES
""" + ", ".join(
str(i) for i in rows
)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_metajob_one
CREATE TABLE IF NOT EXISTS memory.default.test_dag_one
(
id varchar,
first_name varchar,
Expand Down
1 change: 1 addition & 0 deletions examples/dag-example/ingest-job2/01_drop_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_dag_two
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = """
INSERT INTO memory.default.test_metajob_two VALUES
INSERT INTO memory.default.test_dag_two VALUES
""" + ", ".join(
str(i) for i in rows
)

job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS memory.default.test_metajob_two
CREATE TABLE IF NOT EXISTS memory.default.test_dag_two
(
id integer,
first_name varchar,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


def run(job_input: IJobInput):
job1_data = job_input.execute_query("SELECT * FROM memory.default.test_metajob_one")
job2_data = job_input.execute_query("SELECT * FROM memory.default.test_metajob_two")
job1_data = job_input.execute_query("SELECT * FROM memory.default.test_dag_one")
job2_data = job_input.execute_query("SELECT * FROM memory.default.test_dag_two")

print(f"Job 1 Data ===> {job1_data} \n\n\n Job 2 Data ===> {job2_data}")
1 change: 1 addition & 0 deletions examples/dag-example/read-data-job/20_drop_table_one.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_dag_one
1 change: 1 addition & 0 deletions examples/dag-example/read-data-job/30_drop_table_two.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists memory.default.test_dag_two
20 changes: 17 additions & 3 deletions examples/dag-with-args-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def run(job_input: IJobInput):
data_job_dir = pathlib.Path(job_input.get_job_directory())
data_file = data_job_dir / "data.json"

db_catalog = job_input.get_arguments().get("db_catalog")
db_schema = job_input.get_arguments().get("db_schema")
db_table = job_input.get_arguments().get("db_table")

Expand Down Expand Up @@ -510,7 +511,7 @@ from vdk.plugin.dag.dag_runner import DagInput

JOBS_RUN_ORDER = [
{
"job_name": "ingest-job-table-one",
"job_name": "ingest-job-table-one ",
"team_name": "my-team",
"fail_dag_on_error": True,
"arguments": {
Expand Down Expand Up @@ -568,7 +569,7 @@ def run(job_input) -> None:
```
</details>

Note that the `run_dag` method belongs to the `DAGInput` object which must be imported
Note that the `run_dag` method belongs to the `DagInput` object which must be imported
and instantiated separately from the default `IJobInput` object which is passed to the `run` function by default.

<details>
Expand All @@ -588,8 +589,9 @@ team = my-team

[vdk]
dags_max_concurrent_running_jobs = 2
dags_delayed_jobs_min_delay_seconds = 1

dags_delayed_jobs_randomized_added_delay_seconds = 1
dags_delayed_jobs_min_delay_seconds = 1
```
</details>

Expand Down Expand Up @@ -622,6 +624,18 @@ The other two configurations are set in order to have a short fixed delay for de
Check the [configuration](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin_configuration.py)
for more details.

<details>
<summary>requirements.txt</summary>

```text
vdk-dag
```
</details>

Note that the VDK DAG Job does not require the `vdk-trino` dependency.
Component jobs are responsible for their own dependencies, and the DAG Job only handles their triggering.


## Execution

[Here](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-dag#high-level-design) you can read
Expand Down
Loading