From 0d04615cd7ff4415bb9c7104470995bc83bae743 Mon Sep 17 00:00:00 2001 From: sbuldeev Date: Fri, 7 Jul 2023 15:57:49 +0300 Subject: [PATCH 1/5] Truncate table before inserting data Why: Currently when the insert template is used with quality checks it only appends data to the staging table without truncating it at any moment. This will most probably lead to duplicated data on the following run, because old source data will be already in the staging table before appending the new one. More details explained in #1361 What: -Adding truncate statement before the data is being inserted in the staging table to ensure that no leftover data from previous runs is left. Signed-off-by: Stefan Buldeev sbuldeev@vmware.com --- .../templates/load/fact/insert/02-handle-quality-checks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/insert/02-handle-quality-checks.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/insert/02-handle-quality-checks.py index e5482aec05..b54ab2bc1d 100644 --- a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/insert/02-handle-quality-checks.py +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/insert/02-handle-quality-checks.py @@ -39,6 +39,8 @@ def run(job_input: IJobInput): align_stg_table_with_target(target_table_full_name, staging_table, job_input) + job_input.execute_query(f"TRUNCATE {staging_table}") + insert_into_staging = insert_query.format( target_schema=staging_schema, target_table=staging_table_name, From 565f0a4236ab3a8552fadc7b03a41a90a469fcd1 Mon Sep 17 00:00:00 2001 From: sbuldeev Date: Mon, 10 Jul 2023 16:42:05 +0300 Subject: [PATCH 2/5] Added test for staging table clean up --- .../functional/template_regression_test.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py index 6f14dbcdb7..d9c5ab9d6d 100644 --- a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py +++ b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py @@ -827,3 +827,48 @@ def test_insert_checks_negative(self) -> None: expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}") assert actual_rs.output and expected_rs.output assert actual_rs.output != expected_rs.output + + def test_insert_clean_staging(self) -> None: + test_schema = "vdkprototypes" + staging_schema = "staging_vdkprototypes" + source_view = "vw_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging" + target_table = "dw_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging" + expect_table = "ex_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging" + + res_first_exec = self._run_job( + "insert_template_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "check": "use_positive_check", + "staging_schema": staging_schema, + }, + ) + staging_table_name = f"vdk_check_{test_schema}_{target_table}" + first_exec_rs = self._run_query(f"SELECT * FROM {staging_schema}.{staging_table_name}") + + res_second_exec = self._run_job( + "insert_template_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "check": "use_positive_check", + "staging_schema": staging_schema, + }, + ) + + second_exec_rs = self._run_query(f"SELECT * FROM {staging_schema}.{staging_table_name}") + first_exec = {x for x in first_exec_rs.output.split("\n")} + second_exec = {x for x in second_exec_rs.output.split("\n")} + + self.assertSetEqual( + first_exec, second_exec, f"Clean up of staging table is not made properly." + ) \ No newline at end of file From 5c5568a7d902659e81c7c0b2b0e3b61973adec7a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 10 Jul 2023 13:44:53 +0000 Subject: [PATCH 3/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../tests/functional/template_regression_test.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py index d9c5ab9d6d..14a3f32bb3 100644 --- a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py +++ b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py @@ -849,7 +849,9 @@ def test_insert_clean_staging(self) -> None: }, ) staging_table_name = f"vdk_check_{test_schema}_{target_table}" - first_exec_rs = self._run_query(f"SELECT * FROM {staging_schema}.{staging_table_name}") + first_exec_rs = self._run_query( + f"SELECT * FROM {staging_schema}.{staging_table_name}" + ) res_second_exec = self._run_job( "insert_template_job", @@ -865,10 +867,12 @@ def test_insert_clean_staging(self) -> None: }, ) - second_exec_rs = self._run_query(f"SELECT * FROM {staging_schema}.{staging_table_name}") + second_exec_rs = self._run_query( + f"SELECT * FROM {staging_schema}.{staging_table_name}" + ) first_exec = {x for x in first_exec_rs.output.split("\n")} second_exec = {x for x in second_exec_rs.output.split("\n")} self.assertSetEqual( first_exec, second_exec, f"Clean up of staging table is not made properly." - ) \ No newline at end of file + ) From cc9b6cea18803cabde8ed497908cb10e1f72130b Mon Sep 17 00:00:00 2001 From: sbuldeev Date: Tue, 11 Jul 2023 14:00:51 +0300 Subject: [PATCH 4/5] Improve test for staging table clean up --- .../vdk-impala/tests/functional/template_regression_test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py index d9c5ab9d6d..9bab8aed3b 100644 --- a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py +++ b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py @@ -14,6 +14,7 @@ from vdk.internal.core import errors from vdk.plugin.impala import impala_plugin from vdk.plugin.test_utils.util_funcs import cli_assert +from vdk.plugin.test_utils.util_funcs import cli_assert_equal from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner from vdk.plugin.test_utils.util_funcs import get_test_job_path @@ -850,6 +851,7 @@ def test_insert_clean_staging(self) -> None: ) staging_table_name = f"vdk_check_{test_schema}_{target_table}" first_exec_rs = self._run_query(f"SELECT * FROM {staging_schema}.{staging_table_name}") + cli_assert_equal(0, res_first_exec) res_second_exec = self._run_job( "insert_template_job", @@ -864,11 +866,12 @@ def test_insert_clean_staging(self) -> None: "staging_schema": staging_schema, }, ) + cli_assert_equal(0, res_second_exec) second_exec_rs = self._run_query(f"SELECT * FROM {staging_schema}.{staging_table_name}") first_exec = {x for x in first_exec_rs.output.split("\n")} second_exec = {x for x in second_exec_rs.output.split("\n")} self.assertSetEqual( - first_exec, second_exec, f"Clean up of staging table is not made properly." + first_exec, second_exec, f"Clean up of staging table - {staging_table_name} is not made properly. Different data was found in the table after consecutive executions." ) \ No newline at end of file From 8db0c755dcf36965b21aca7b64f5fb000eb49bfa Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 11 Jul 2023 11:06:24 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../tests/functional/template_regression_test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py index e80ae18c6c..6de220e5c1 100644 --- a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py +++ b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py @@ -850,7 +850,9 @@ def test_insert_clean_staging(self) -> None: }, ) staging_table_name = f"vdk_check_{test_schema}_{target_table}" - first_exec_rs = self._run_query(f"SELECT * FROM {staging_schema}.{staging_table_name}") + first_exec_rs = self._run_query( + f"SELECT * FROM {staging_schema}.{staging_table_name}" + ) cli_assert_equal(0, res_first_exec) res_second_exec = self._run_job( @@ -875,5 +877,7 @@ def test_insert_clean_staging(self) -> None: second_exec = {x for x in second_exec_rs.output.split("\n")} self.assertSetEqual( - first_exec, second_exec, f"Clean up of staging table - {staging_table_name} is not made properly. Different data was found in the table after consecutive executions." + first_exec, + second_exec, + f"Clean up of staging table - {staging_table_name} is not made properly. Different data was found in the table after consecutive executions.", )