Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-heartbeat: trino ingestion test #514

Merged
merged 2 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions projects/vdk-heartbeat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,62 @@ Versioning follows https://semver.org.

* A release step in Gitlab CI is automatically triggered after merging changes if build/tests are successful.
* Update major or minor version when necessary only.

## Tests
### Database ingestion
The testing job ingests data into a database and reads it from that database to verify the results.
#### Configuration
##### Target
Target identifies where the data should be ingested into.

The value for this parameter depends on the ingest method chosen.
* For "http" method, it would require an HTTP URL.
Example: http://example.com/<some>/<api>/<endpoint>
* For "file" method, it would require a file name or path.
```
export VDK_HEARTBEAT_INGEST_TARGET="datasource"
```
##### Method
Indicates the ingestion method to be used. Example:
* method="file" -> ingest to file
* method="http" -> ingest using HTTP POST requests
* method="kafka" -> ingest to kafka endpoint
```
export VDK_HEARTBEAT_INGEST_METHOD="http"
```
##### Destination table
The name of the table, where the data should be ingested into.
This parameter does not need to be passed, in case the table is
included in the payload itself.
```
export VDK_HEARTBEAT_INGEST_DESTINATION_TABLE="destination_table"
```
##### Database type
```
export DB_DEFAULT_TYPE="trino"
```
##### Database name
```
export DATABASE_TEST_DB="memory.default"
```
#### Scenarios
##### Trino ingestion
VDK_HEARTBEAT_INGEST_METHOD is set to "TRINO" and DB_DEFAULT_TYPE is set to "TRINO"
and the connection settings for both is the same (same Trino database instance).
```
export VDK_HEARTBEAT_INGEST_TARGET="trino-http-datasource"
export VDK_HEARTBEAT_INGEST_METHOD="TRINO"
export VDK_HEARTBEAT_INGEST_DESTINATION_TABLE="sample_destination_table"
export DB_DEFAULT_TYPE="trino"
export DATABASE_TEST_DB="memory.default"
```
##### Trino HTTP ingestion
VDK_HEARTBEAT_INGEST_METHOD is set to "HTTP" and DB_DEFAULT_TYPE is set to "TRINO"
and connection settings are set to same Trino instance.
```
export VDK_HEARTBEAT_INGEST_TARGET="trino-http-datasource"
export VDK_HEARTBEAT_INGEST_METHOD="http"
export VDK_HEARTBEAT_INGEST_DESTINATION_TABLE="sample_destination_table"
export DB_DEFAULT_TYPE="trino"
export DATABASE_TEST_DB="memory.default"
```
27 changes: 27 additions & 0 deletions projects/vdk-heartbeat/src/vdk/internal/heartbeat/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,33 @@ def __init__(self, config_file=None):
self.get_value("RUN_TEST_TIMEOUT_SECONDS", "1200")
)

# Ingestion test configuration
# Target identifies where the data should be ingested into.
self.INGEST_TARGET = self.get_value(
"VDK_HEARTBEAT_INGEST_TARGET",
"vdk-heartbeat-datasource",
)

"""
Indicates the ingestion method to be used. Example:
method="file" -> ingest to file
method="http" -> ingest using HTTP POST requests
method="kafka" -> ingest to kafka endpoint
"""
self.INGEST_METHOD = self.get_value("VDK_HEARTBEAT_INGEST_METHOD", "http")

# The name of the table, where the data should be ingested into.
self.INGEST_DESTINATION_TABLE = self.get_value(
"VDK_HEARTBEAT_INGEST_DESTINATION_TABLE",
"vdk_heartbeat_ingestion_test",
)

# The time for which the data should be ingested.
self.INGEST_TIMEOUT = self.get_value(
"VDK_HEARTBEAT_INGEST_TIMEOUT",
"300",
)

# The Control Service API URL (http://url/data-jobs) without data-jobs suffix
self.control_api_url = self._get_atleast_one_value(
"CONTROL_API_URL", "VDK_HEARTBEAT_CONTROL_SERVICE_URL"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ def run(job_input):
props['table_load_destination'] = "{self.config.DATABASE_TEST_TABLE_LOAD_DESTINATION}"
props['job_name'] = "{self.config.job_name}"
props['execute_template'] = "{self.config.check_template_execution}"
props['ingest_target'] = "{self.config.INGEST_TARGET}"
props['ingest_method'] = "{self.config.INGEST_METHOD}"
props['ingest_destination_table'] = "{self.config.INGEST_DESTINATION_TABLE}"
props['ingest_timeout'] = "{self.config.INGEST_TIMEOUT}"
job_input.set_all_properties(props)
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ def execute_test(self):
start_time = time.time()
caught_exception = None
while time.time() - start_time < self.config.RUN_TEST_TIMEOUT_SECONDS:
log.info(f"Search for job property to set 'now' property.")
log.info(f"Search for job property to set 'succeeded' property.")
try:
props = self.__job_controller.get_job_properties()
if props and "now" in props:
if props and "succeeded" in props:
log.info(
f"Data Job has recorded successfully property 'now' = {props['now']}"
f"Data Job has recorded successfully property 'succeeded' = {props['succeeded']}"
)
return
else:
Expand All @@ -68,6 +68,6 @@ def execute_test(self):
raise AssertionError(
"Simple test failed with timeout. "
f"It was waiting for data job {self.config.job_name} to update its job properties "
f"with key 'now' and value - current time. But the job did not do it in time. "
f"with key 'succeeded'. But the job did not do it in time. "
f"Check data job logs for possible errors."
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import uuid

from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
props = job_input.get_all_properties()
props["id1"] = str(uuid.uuid4())
props["id2"] = str(uuid.uuid4())
props["id3"] = str(uuid.uuid4())

# Test creates dynamically file 06_override_properties.py
# which will override the following properties
props["ingest_destination_table"] = "destination_table"
props["ingest_method"] = "http"
props["ingest_target"] = "datasource"
props["ingest_timeout"] = "300"
props["db"] = "memory.default"
job_input.set_all_properties(props)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
props = job_input.get_all_properties()
payload = {"id1": props["id1"], "id2": props["id2"], "id3": props["id3"]}

# Ingest the data
log.info(f"Sending the following payload for ingestion: {payload}")
job_input.send_object_for_ingestion(
payload=payload,
destination_table=props["ingest_destination_table"],
method=props["ingest_method"],
target=props["ingest_target"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import time

from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
# If db property includes catalog name, get only the schema name ('default' from 'memory.default').
# This is done because the templates' implementation escapes db, table and column names passed as arguments, so that
# reserved words could be used for their values
db_name = job_input.get_property("db").split(".")[-1]

props = job_input.get_all_properties()
destination_table = props["ingest_destination_table"]
id1 = props["id1"]
id2 = props["id2"]
id3 = props["id3"]

timeout = int(props["ingest_timeout"]) # [seconds]
timeout_start = time.time()

while time.time() < timeout_start + timeout:
# check if data was correctly ingested
result = job_input.execute_query(
f"SELECT id1, id2, id3 FROM {db_name}.{destination_table} "
f"WHERE id1 = '{id1}' "
f"AND id2 = '{id2}' "
f"AND id3 = '{id3}'"
)
log.info(f"Query result: {result}")

if result:
if result[0][0] != id1 and result[0][1] != id2 and result[0][2] != id3:
raise Exception("The data is not ingested correctly")
else:
props["succeeded"] = "true"
break
else:
time.sleep(10)

job_input.set_all_properties(props)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[owner]
team = taurus

[job]
schedule_cron = * * * * *

[contacts]
notified_on_job_failure_user_error=
notified_on_job_failure_platform_error=
notified_on_job_success=
notified_on_job_deploy=
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-trino
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import time

from vdk.api.job_input import IJobInput

Expand All @@ -11,7 +10,7 @@
def run(job_input: IJobInput):
log.info(f"Get current properties ")
props = job_input.get_all_properties()
props["now"] = time.time()
props["succeeded"] = "true"
log.info(f"Save new properties ")
job_input.set_all_properties(props)
log.info(f"Updated property now to {props['now']}")
log.info(f"Updated property now to {props['succeeded']}")