Skip to content

Commit

Permalink
vdk-heartbeat: trino ingestion test
Browse files Browse the repository at this point in the history
The test sends object for ingestion and then verifies
whether object has been ingested.

Testing: tested manually against ETL and Trino

Signed-off-by: Miroslav Ivanov [email protected]
  • Loading branch information
mivanov1988 committed Nov 15, 2021
1 parent 1071b40 commit 3d5367d
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 8 deletions.
13 changes: 13 additions & 0 deletions projects/vdk-heartbeat/src/vdk/internal/heartbeat/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ def __init__(self, config_file=None):
self.get_value("RUN_TEST_TIMEOUT_SECONDS", "1200")
)

# Ingestion configuration
self.INGEST_TARGET = self.get_value(
"INGEST_TARGET",
"vdk-heartbeat-datasource",
)

self.INGEST_METHOD = self.get_value("INGEST_METHOD", "http")

self.INGEST_DESTINATION_TABLE = self.get_value(
"INGEST_DESTINATION_TABLE",
"vdk_heartbeat_ingestion_test",
)

# The Control Service API URL (http://url/data-jobs) without data-jobs suffix
self.control_api_url = self._get_atleast_one_value(
"CONTROL_API_URL", "VDK_HEARTBEAT_CONTROL_SERVICE_URL"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ def run(job_input):
props['table_load_destination'] = "{self.config.DATABASE_TEST_TABLE_LOAD_DESTINATION}"
props['job_name'] = "{self.config.job_name}"
props['execute_template'] = "{self.config.check_template_execution}"
props['ingest_target'] = "{self.config.INGEST_TARGET}"
props['ingest_method'] = "{self.config.INGEST_METHOD}"
props['ingest_destination_table'] = "{self.config.INGEST_DESTINATION_TABLE}"
job_input.set_all_properties(props)
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ def clean_up(self):

@LogDecorator(log)
def execute_test(self):
wait_time_seconds = 30
wait_time_seconds = 360
start_time = time.time()
caught_exception = None
while time.time() - start_time < self.config.RUN_TEST_TIMEOUT_SECONDS:
log.info(f"Search for job property to set 'now' property.")
log.info(f"Search for job property to set 'succeeded' property.")
try:
props = self.__job_controller.get_job_properties()
if props and "now" in props:
if props and "succeeded" in props:
log.info(
f"Data Job has recorded successfully property 'now' = {props['now']}"
f"Data Job has recorded successfully property 'succeeded' = {props['succeeded']}"
)
return
else:
Expand All @@ -68,6 +68,6 @@ def execute_test(self):
raise AssertionError(
"Simple test failed with timeout. "
f"It was waiting for data job {self.config.job_name} to update its job properties "
f"with key 'now' and value - current time. But the job did not do it in time. "
f"with key 'succeeded'. But the job did not do it in time. "
f"Check data job logs for possible errors."
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import uuid

from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
props = job_input.get_all_properties()
props["id1"] = str(uuid.uuid4())
props["id2"] = str(uuid.uuid4())
props["id3"] = str(uuid.uuid4())
job_input.set_all_properties(props)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
props = job_input.get_all_properties()
payload = {"id1": props["id1"], "id2": props["id2"], "id3": props["id3"]}

# Ingest the data
log.info(f"Sending the following payload for ingestion: {payload}")
job_input.send_object_for_ingestion(
payload=payload,
destination_table=props["ingest_destination_table"],
method=props["ingest_method"],
target=props["ingest_target"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import time

from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
# If db property includes catalog name, get only the schema name ('default' from 'memory.default').
# This is done because the templates' implementation escapes db, table and column names passed as arguments, so that
# reserved words could be used for their values
db_name = job_input.get_property("db").split(".")[-1]

props = job_input.get_all_properties()
destination_table = props["destination_table"]
id1 = props["id1"]
id2 = props["id2"]
id3 = props["id3"]

timeout = 300 # [seconds]
timeout_start = time.time()

while time.time() < timeout_start + timeout:
# check if data was correctly ingested
result = job_input.execute_query(
f"SELECT id1, id2, id3 FROM {db_name}.{destination_table} "
f"WHERE id1 = '{id1}' "
f"AND id2 = '{id2}' "
f"AND id3 = '{id3}'"
)
log.info(f"Query result: {result}")

if result:
if result[0][0] != id1 and result[0][1] != id2 and result[0][2] != id3:
raise Exception("The data is not ingested correctly")
else:
props["succeeded"] = "true"
break
else:
time.sleep(10)

job_input.set_all_properties(props)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[owner]
team = taurus

[job]
schedule_cron = * * * * *

[contacts]
notified_on_job_failure_user_error=
notified_on_job_failure_platform_error=
notified_on_job_success=
notified_on_job_deploy=
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-trino
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import time

from vdk.api.job_input import IJobInput

Expand All @@ -11,7 +10,7 @@
def run(job_input: IJobInput):
log.info(f"Get current properties ")
props = job_input.get_all_properties()
props["now"] = time.time()
props["succeeded"] = "true"
log.info(f"Save new properties ")
job_input.set_all_properties(props)
log.info(f"Updated property now to {props['now']}")
log.info(f"Updated property now to {props['succeeded']}")

0 comments on commit 3d5367d

Please sign in to comment.