Skip to content

Commit

Permalink
vdk-trino: Add unit tests for templates using reserved words
Browse files Browse the repository at this point in the history
Trino templates can work with identifiers which are
reserved Trino keywords.

A unit test is added for each template (scd1, scd2 and
periodic snapshot) with reserved keywords as table and
column names.

Tested by unit tests.

Signed-off-by: Yana Zhivkova <[email protected]>
  • Loading branch information
YanaZhivkova committed Aug 24, 2021
1 parent 7aba991 commit 42172f5
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
Load example input data for an scd2 template test.
"""
from taurus.api.job_input import IJobInput
from taurus.vdk.trino_utils import TrinoTemplateQueries


def run(job_input: IJobInput) -> None:
# Step 1: create a table that represents the current state

job_input.execute_query(
"""
DROP TABLE IF EXISTS "{target_schema}"."{target_table}"
"""
)
job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS "{target_schema}"."{target_table}" (
"{sk_column}" VARCHAR,
{active_from_column} TIMESTAMP,
{active_to_column} TIMESTAMP,
"{id_column}" INT,
"with" INT,
state VARCHAR,
is_next BOOLEAN,
cloud_vendor VARCHAR,
version SMALLINT
)
"""
)
job_input.execute_query(
"""
INSERT INTO "{target_schema}"."{target_table}" VALUES
('sddc01-v01', TIMESTAMP '2019-01-01', TIMESTAMP '9999-12-31', 1, 7, 'RUNNING' , false, 'Azure', 498),
('sddc02-v01', TIMESTAMP '2019-02-01', TIMESTAMP '9999-12-31', 2, 9, 'STOPPED' , false, 'AWS' , 500),
('sddc03-v01', TIMESTAMP '2019-03-01', TIMESTAMP '9999-12-31', 3, 3, 'PROVISIONING', false, 'Azure', 497),
('sddc04-v01', TIMESTAMP '2019-04-01', TIMESTAMP '9999-12-31', 4, 5, 'PROVISIONING', true , 'Azure', 498),
('sddc05-v01', TIMESTAMP '2019-05-01', TIMESTAMP '2019-05-02', 5, 9, 'STARTING' , true , 'AWS' , 500),
('sddc05-v02', TIMESTAMP '2019-05-02', TIMESTAMP '2019-05-03', 5, 2, 'STARTING' , true , 'AWS' , 500),
('sddc05-v03', TIMESTAMP '2019-05-03', TIMESTAMP '9999-12-31', 5, 3, 'STARTING' , true , 'AWS' , 500)
"""
)

# Step 2: create a table that represents the delta to be applied

job_input.execute_query(
"""
DROP TABLE IF EXISTS "{source_schema}"."{source_view}"
"""
)
job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS "{source_schema}"."{source_view}" (
{updated_at_column} TIMESTAMP,
"{id_column}" INT,
"with" INT,
state VARCHAR,
is_next BOOLEAN,
cloud_vendor VARCHAR,
version SMALLINT
)
"""
)
job_input.execute_query(
"""
INSERT INTO "{source_schema}"."{source_view}" VALUES
(TIMESTAMP '2019-02-02', 2, 1, 'STARTING' , false, 'AWS' , 500), -- Update (1) - new time, new values
(TIMESTAMP '2019-03-01', 3, 4, 'RUNNING' , false, 'Azure', 497), -- Update (2) - same time, new values
(TIMESTAMP '2019-04-02', 4, 5, 'PROVISIONING', true , 'Azure', 498), -- Update (3) - new time, same values
(TIMESTAMP '2019-05-01', 5, 9, 'STARTING' , true , 'AWS' , 500), -- Update (4) - same time, same values
(TIMESTAMP '2019-05-02', 5, 9, 'STARTING' , true , 'AWS' , 500), -- Update (5) - same time, prev values
(TIMESTAMP '2019-05-04', 5, 9, 'STARTING' , true , 'AWS' , 500), -- Update (1) - new time, new values
(TIMESTAMP '2019-06-01', 6, 9, 'STARTING' , true , 'AWS' , 499) -- Insert
"""
)

# Step 3: Create a table containing the state expected after updating the current state with the given delta

job_input.execute_query(
"""
DROP TABLE IF EXISTS "{expect_schema}"."{expect_table}"
"""
)
job_input.execute_query(
"""
CREATE TABLE IF NOT EXISTS "{expect_schema}"."{expect_table}" (
"{sk_column}" VARCHAR,
{active_from_column} TIMESTAMP,
{active_to_column} TIMESTAMP,
"{id_column}" INT,
"with" INT,
state VARCHAR,
is_next BOOLEAN,
cloud_vendor VARCHAR,
version SMALLINT
)
"""
)
job_input.execute_query(
"""
INSERT INTO "{expect_schema}"."{expect_table}" VALUES
('sddc01-v01', TIMESTAMP '2019-01-01', TIMESTAMP '9999-12-31', 1, 7, 'RUNNING' , false, 'Azure', 498),
('sddc02-v01', TIMESTAMP '2019-02-01', TIMESTAMP '2019-02-02', 2, 9, 'STOPPED' , false, 'AWS' , 500),
('sddc02-v02', TIMESTAMP '2019-02-02', TIMESTAMP '9999-12-31', 2, 1, 'STARTING' , false, 'AWS' , 500),
('sddc03-v01', TIMESTAMP '2019-03-01', TIMESTAMP '9999-12-31', 3, 4, 'RUNNING' , false, 'Azure', 497),
('sddc04-v01', TIMESTAMP '2019-04-01', TIMESTAMP '9999-12-31', 4, 5, 'PROVISIONING', true , 'Azure', 498),
('sddc05-v01', TIMESTAMP '2019-05-01', TIMESTAMP '2019-05-03', 5, 9, 'STARTING' , true , 'AWS' , 500),
('sddc05-v03', TIMESTAMP '2019-05-03', TIMESTAMP '2019-05-04', 5, 3, 'STARTING' , true , 'AWS' , 500),
('sddc05-v04', TIMESTAMP '2019-05-04', TIMESTAMP '9999-12-31', 5, 9, 'STARTING' , true , 'AWS' , 500),
('sddc06-v01', TIMESTAMP '2019-06-01', TIMESTAMP '9999-12-31', 6, 9, 'STARTING' , true , 'AWS' , 499)
"""
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from taurus.api.job_input import IJobInput


def run(job_input: IJobInput) -> None:
result = job_input.execute_template(
template_name="scd2",
template_args=job_input.get_arguments(),
)
if result.is_failed() and result.get_exception():
raise result.get_exception()
163 changes: 138 additions & 25 deletions projects/vdk-core/plugins/vdk-trino/tests/test_vdk_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,59 @@ def test_scd1_template(self) -> None:
actual_rs.output == expected_rs.output
), f"Elements in {source_view} and {target_table} differ."

def test_scd1_template_reserved_args(self) -> None:
source_schema = "default"
source_view = "alter"
target_schema = "default"
target_table = "table"

result: Result = self.__runner.invoke(
[
"run",
get_test_job_path(
pathlib.Path(os.path.dirname(os.path.abspath(__file__))),
"load_dimension_scd1_template_job",
),
"--arguments",
json.dumps(
{
"source_schema": source_schema,
"source_view": source_view,
"target_schema": target_schema,
"target_table": target_table,
}
),
]
)

cli_assert_equal(0, result)

actual_rs: Result = self.__runner.invoke(
[
"trino-query",
"--query",
f"""
SELECT * FROM "{target_schema}"."{target_table}"
""",
]
)

expected_rs: Result = self.__runner.invoke(
[
"trino-query",
"--query",
f"""
SELECT * FROM "{source_schema}"."{source_view}"
""",
]
)

cli_assert_equal(0, actual_rs)
cli_assert_equal(0, expected_rs)
assert (
actual_rs.output == expected_rs.output
), f"Elements in {source_view} and {target_table} differ."

def test_scd2_template(self) -> None:
test_schema = "default"
source_view = "vw_scmdb_people"
Expand All @@ -129,6 +182,25 @@ def test_scd2_template(self) -> None:
1, self.__template_table_exists(test_schema, "backup_" + target_table)
)

def test_scd2_template_reserved_args(self) -> None:
test_schema = "default"
source_view = "alter"
target_table = "table"
expect_table = "between"

result: Result = self.__scd2_template_execute(
test_schema, source_view, target_table, expect_table, False, "reserved"
)
cli_assert_equal(0, result)

# Check if we got the expected result and successfully dropped backup
self.__scd2_template_check_expected_res(
test_schema, target_table, expect_table, "reserved"
)
cli_assert_equal(
1, self.__template_table_exists(test_schema, "backup_" + target_table)
)

def test_scd2_template_restore_target_from_backup_on_start(self) -> None:
test_schema = "default"
source_view = "vw_scmdb_people"
Expand Down Expand Up @@ -203,6 +275,21 @@ def test_fact_periodic_snapshot_template(self) -> None:
test_schema, target_table, expect_table
)

def test_fact_periodic_snapshot_template_reserved_args(self) -> None:
test_schema = "default"
source_view = "alter"
target_table = "table"
expect_table = "between"

result: Result = self.__fact_periodic_snapshot_template_execute(
test_schema, source_view, target_table, expect_table
)
cli_assert_equal(0, result)

self.__fact_periodic_snapshot_template_check_expected_res(
test_schema, target_table, expect_table
)

def test_fact_periodic_snapshot_empty_source(self) -> None:
test_schema = "default"
source_view = "vw_fact_sddc_daily"
Expand Down Expand Up @@ -373,13 +460,16 @@ def __scd2_template_execute(
target_table,
expect_table,
restore_from_backup=False,
reserved=False,
):
return self.__runner.invoke(
[
"run",
get_test_job_path(
pathlib.Path(os.path.dirname(os.path.abspath(__file__))),
"load_dimension_scd2_template_job",
reserved
and "load_dimension_scd2_template_job_reserved_args"
or "load_dimension_scd2_template_job",
),
"--arguments",
json.dumps(
Expand All @@ -391,17 +481,17 @@ def __scd2_template_execute(
"staging_schema": test_schema,
"expect_schema": test_schema,
"expect_table": expect_table,
"id_column": "sddc_id",
"sk_column": "sddc_sk",
"id_column": reserved and "when" or "sddc_id",
"sk_column": reserved and "where" or "sddc_sk",
"value_columns": [
"updated_by_user_id",
reserved and "with" or "updated_by_user_id",
"state",
"is_next",
"cloud_vendor",
"version",
],
"tracked_columns": [
"updated_by_user_id",
reserved and "with" or "updated_by_user_id",
"state",
"is_next",
"version",
Expand All @@ -420,30 +510,53 @@ def __scd2_template_execute(
)

def __scd2_template_check_expected_res(
self, test_schema, target_table, expect_table
self, test_schema, target_table, expect_table, reserved=False
) -> None:
# don't check first (surrogate key) column from the two results,
# as those are uniquely generated and might differ

actual_rs: Result = self.__runner.invoke(
[
"trino-query",
"--query",
f"""SELECT active_from, active_to, sddc_id, updated_by_user_id, state, is_next, cloud_vendor, version
FROM "{test_schema}"."{target_table}"
ORDER BY sddc_id, active_to""",
]
)

expected_rs: Result = self.__runner.invoke(
[
"trino-query",
"--query",
f"""SELECT active_from, active_to, sddc_id, updated_by_user_id, state, is_next, cloud_vendor, version
FROM "{test_schema}"."{expect_table}"
ORDER BY sddc_id, active_to""",
]
)
if reserved:
actual_rs: Result = self.__runner.invoke(
[
"trino-query",
"--query",
f"""
SELECT active_from, active_to, "when", "with", state, is_next, cloud_vendor, version
FROM "{test_schema}"."{target_table}"
ORDER BY "when", active_to
""",
]
)

expected_rs: Result = self.__runner.invoke(
[
"trino-query",
"--query",
f"""SELECT active_from, active_to, "when", "with", state, is_next, cloud_vendor, version
FROM "{test_schema}"."{expect_table}"
ORDER BY "when", active_to""",
]
)
else:
actual_rs: Result = self.__runner.invoke(
[
"trino-query",
"--query",
f"""SELECT active_from, active_to, sddc_id, updated_by_user_id, state, is_next, cloud_vendor, version
FROM "{test_schema}"."{target_table}"
ORDER BY sddc_id, active_to""",
]
)

expected_rs: Result = self.__runner.invoke(
[
"trino-query",
"--query",
f"""SELECT active_from, active_to, sddc_id, updated_by_user_id, state, is_next, cloud_vendor, version
FROM "{test_schema}"."{expect_table}"
ORDER BY sddc_id, active_to""",
]
)

cli_assert_equal(0, actual_rs)
cli_assert_equal(0, expected_rs)
Expand Down

0 comments on commit 42172f5

Please sign in to comment.