Skip to content

Commit

Permalink
vdk-impala: Truncate table before inserting data (#2369)
Browse files Browse the repository at this point in the history
Why:
Currently when the insert template is used with quality checks it only
appends data to the staging table without truncating it at any moment.
This will most probably lead to duplicated data on the following run,
because old source data will be already in the staging table before
appending the new one.

More details explained in
#1361

What:
-Adding truncate statement before the data is being inserted in the
staging table to ensure that no leftover data from previous runs is
left.

---------

Signed-off-by: Stefan Buldeev [email protected]
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
sbuldeev and pre-commit-ci[bot] authored Jul 12, 2023
1 parent 9d7ccc5 commit 3dc95ac
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def run(job_input: IJobInput):

align_stg_table_with_target(target_table_full_name, staging_table, job_input)

job_input.execute_query(f"TRUNCATE {staging_table}")

insert_into_staging = insert_query.format(
target_schema=staging_schema,
target_table=staging_table_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from vdk.internal.core import errors
from vdk.plugin.impala import impala_plugin
from vdk.plugin.test_utils.util_funcs import cli_assert
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
from vdk.plugin.test_utils.util_funcs import get_test_job_path

Expand Down Expand Up @@ -827,3 +828,56 @@ def test_insert_checks_negative(self) -> None:
expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}")
assert actual_rs.output and expected_rs.output
assert actual_rs.output != expected_rs.output

def test_insert_clean_staging(self) -> None:
test_schema = "vdkprototypes"
staging_schema = "staging_vdkprototypes"
source_view = "vw_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging"
target_table = "dw_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging"
expect_table = "ex_fact_vmc_utilization_cpu_mem_every5min_daily_clean_staging"

res_first_exec = self._run_job(
"insert_template_job",
{
"source_schema": test_schema,
"source_view": source_view,
"target_schema": test_schema,
"target_table": target_table,
"expect_schema": test_schema,
"expect_table": expect_table,
"check": "use_positive_check",
"staging_schema": staging_schema,
},
)
staging_table_name = f"vdk_check_{test_schema}_{target_table}"
first_exec_rs = self._run_query(
f"SELECT * FROM {staging_schema}.{staging_table_name}"
)
cli_assert_equal(0, res_first_exec)

res_second_exec = self._run_job(
"insert_template_job",
{
"source_schema": test_schema,
"source_view": source_view,
"target_schema": test_schema,
"target_table": target_table,
"expect_schema": test_schema,
"expect_table": expect_table,
"check": "use_positive_check",
"staging_schema": staging_schema,
},
)
cli_assert_equal(0, res_second_exec)

second_exec_rs = self._run_query(
f"SELECT * FROM {staging_schema}.{staging_table_name}"
)
first_exec = {x for x in first_exec_rs.output.split("\n")}
second_exec = {x for x in second_exec_rs.output.split("\n")}

self.assertSetEqual(
first_exec,
second_exec,
f"Clean up of staging table - {staging_table_name} is not made properly. Different data was found in the table after consecutive executions.",
)

0 comments on commit 3dc95ac

Please sign in to comment.