diff --git a/projects/vdk-core/plugins/vdk-trino/MANIFEST.in b/projects/vdk-core/plugins/vdk-trino/MANIFEST.in index e73d459a6d..c33aa1c862 100644 --- a/projects/vdk-core/plugins/vdk-trino/MANIFEST.in +++ b/projects/vdk-core/plugins/vdk-trino/MANIFEST.in @@ -1,4 +1,5 @@ include src/taurus/vdk/templates/load/dimension/**/* +include src/taurus/vdk/templates/load/fact/**/* # this only work for source distribution (setup.py sdist) and not for binary distribution (bdist) # https://packaging.python.org/guides/using-manifest-in/ diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/00-verify-valid-target.py b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/00-verify-valid-target.py index 85e1960066..6e51f55de4 100644 --- a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/00-verify-valid-target.py +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/00-verify-valid-target.py @@ -4,17 +4,17 @@ from taurus.api.job_input import IJobInput from taurus.vdk.core import errors -from taurus.vdk.trino_utils import TrinoQueries +from taurus.vdk.trino_utils import TrinoTemplateQueries log = logging.getLogger(__name__) def run(job_input: IJobInput): """ - In this step we try to recover unexistent target table from backup. + In this step we try to recover potentially unexistent target table from backup. In some cases the template might fail during the step where new data is written in target table - (last step where tmp_target_table is renamed to target_table). If this happens, the job fails and - target table is no longer present. Fortunately it has a backup (backup_target_table). + (last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and + target table is no longer present. Fortunately it has a backup. So when the job is retried, this first step should recover the target (if the reason for the previous fail is no longer present). """ @@ -22,48 +22,6 @@ def run(job_input: IJobInput): args = job_input.get_arguments() target_schema = args.get("target_schema") target_table = args.get("target_table") - tmp_target_table = "tmp_" + target_table - backup_target_table = "backup_" + target_table + trino_queries = TrinoTemplateQueries(job_input) - trino_queries = TrinoQueries(job_input) - - if not trino_queries.table_exists(target_schema, target_table): - log.debug("If there is backup, try to recover target from it") - if trino_queries.table_exists(target_schema, backup_target_table): - log.debug("Try to recover target from backup") - try: - trino_queries.move_data_to_table( - target_schema, backup_target_table, target_schema, target_table - ) - log.info( - f"""Successfully recovered {target_schema}.{target_table} from " - "{target_schema}.{backup_target_table}""" - ) - except Exception as e: - errors.log_and_throw( - to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR, - log=log, - what_happened=f"Target table is unexistent and recovering it from backup table failed with " - f"exception: {e}", - why_it_happened=f"""One of the previous job retries failed after dropping " - "{target_schema}.{target_table} and before renaming " - "{target_schema}.{tmp_target_table} to " - "{target_schema}.{target_table}.""", - consequences="Current Step (python file) will fail, and as a result the whole Data Job will fail.", - countermeasures="You could try to recover {target_schema}.{target_table} from" - "{target_schema}.{backup_target_table} by hand and then " - "rerun the job." - "", - ) - - # if there is no target and no backup, the user provided invalid target table - # TODO: create target table automatically if not provided by user - else: - errors.log_and_throw( - to_be_fixed_by=errors.ResolvableBy.USER_ERROR, - log=log, - what_happened="Cannot find target table", - why_it_happened=f"Template is called for unexistent target table: {target_schema}.{target_table}", - consequences="Current Step (python file) will fail, and as a result the whole Data Job will fail.", - countermeasures="Provide valid target table arguments.", - ) + trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table) diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/07-move-data-from-tmp-to-target.py.py b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/07-move-data-from-tmp-to-target.py.py new file mode 100644 index 0000000000..76c7945bfd --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/07-move-data-from-tmp-to-target.py.py @@ -0,0 +1,36 @@ +# Copyright (c) 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from taurus.api.job_input import IJobInput +from taurus.vdk.core import errors +from taurus.vdk.trino_utils import TrinoTemplateQueries + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + """ + In this step we try to move data from tmp_target_table (where we populated the result data in the previous step) + to target table in the following way: + 1. Move data from target_table to a backup table + 2. Try to move data from tmp_target_table to target_table + 3. If 2 fails, try to restore target from backup + 4. If 3 succeeds, drop tmp target. The job fails. + 5. If 3 fails, target table is lost, its content are in the backup table. Next job retry will try to + recover target on its first step. + 6. If 2 succeeds, drop backup, we are ready. + """ + + args = job_input.get_arguments() + target_schema = args.get("target_schema") + target_table = args.get("target_table") + tmp_target_table = "tmp_" + target_table + trino_queries = TrinoTemplateQueries(job_input) + + trino_queries.perform_safe_move_data_to_table_step( + from_db=target_schema, + from_table_name=tmp_target_table, + to_db=target_schema, + to_table_name=target_table, + ) diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/07-rename-tmp-as-target.py b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/07-rename-tmp-as-target.py deleted file mode 100644 index 7d885616c5..0000000000 --- a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/dimension/scd2/07-rename-tmp-as-target.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright (c) 2021 VMware, Inc. -# SPDX-License-Identifier: Apache-2.0 -import logging - -from taurus.api.job_input import IJobInput -from taurus.vdk.core import errors -from taurus.vdk.trino_utils import TrinoQueries - -log = logging.getLogger(__name__) - - -def run(job_input: IJobInput): - """ - In this step we try to rename tmp_target_table (where we populated the result data in the previous step) to - target table in the following way: - 1. Rename target_table to backup_target_table - 2. Try to rename tmp_target_table to target_table - 3. If 2 fails, try to restore target from backup - 4. If 3 succeeds, drop tmp target. The job fails. - 5. If 3 fails, target table is lost, its content are in backup_target_table. Next job retry will try to - recover target on its first step. - 6. If 2 succeeds, drop backup, we are ready. - """ - - args = job_input.get_arguments() - target_schema = args.get("target_schema") - target_table = args.get("target_table") - tmp_target_table = "tmp_" + target_table - backup_target_table = "backup_" + target_table - - trino_queries = TrinoQueries(job_input) - - log.debug("Create backup from target") - trino_queries.move_data_to_table( - from_db=target_schema, - from_table_name=target_table, - to_db=target_schema, - to_table_name=backup_target_table, - ) - try: - log.debug("Create target from tmp target") - result = trino_queries.move_data_to_table( - from_db=target_schema, - from_table_name=tmp_target_table, - to_db=target_schema, - to_table_name=target_table, - ) - except Exception as e: - result = None - if _try_recover_target_from_backup( - trino_queries, target_schema, target_table, backup_target_table - ): - trino_queries.drop_table(target_schema, tmp_target_table) - raise - else: - errors.log_and_throw( - to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR, - log=log, - what_happened=f"""Recovering target from backup table failed. " - "Table {target_schema}.{target_table} is lost!""", - why_it_happened=f"""Step with renaming tmp table to target table failed, so recovery from backup was" - "initiated, but it also failed with error: {e}""", - consequences="Current Step (python file) will fail, and as a result the whole Data Job will fail.", - countermeasures=f"""Please, try the steps bellow in the following order:\n" - "1. Try to rerun the data job OR\n" - "2. First try to recover {target_schema}.{target_table} from" - "{target_schema}.backup_{target_table} by manually executing:\n" - "CREATE TABLE {target_schema}.{target_table} (LIKE {target_schema}.backup_{target_table})\n" - "INSERT INTO {target_schema}.{target_table} SELECT * FROM {target_schema}.backup_{target_table}\n" - "Then try to rerun the data job OR\n" - "3. Report the issue to support team.""", - ) - if result: - log.debug("Target table was successfully created, and we can drop backup") - trino_queries.drop_table(target_schema, backup_target_table) - - -def _try_recover_target_from_backup( - trino_queries: TrinoQueries, db: str, target_table: str, backup_table: str -): - log.debug("Try to recover target from backup") - try: - result = trino_queries.move_data_to_table( - from_db=db, - from_table_name=backup_table, - to_db=db, - to_table_name=target_table, - ) - except Exception as e: - result = None - pass - - return result diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/00-verify-valid-target.py b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/00-verify-valid-target.py new file mode 100644 index 0000000000..6e51f55de4 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/00-verify-valid-target.py @@ -0,0 +1,27 @@ +# Copyright (c) 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from taurus.api.job_input import IJobInput +from taurus.vdk.core import errors +from taurus.vdk.trino_utils import TrinoTemplateQueries + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + """ + In this step we try to recover potentially unexistent target table from backup. + In some cases the template might fail during the step where new data is written in target table + (last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and + target table is no longer present. Fortunately it has a backup. + So when the job is retried, this first step should recover the target (if the reason for the previous fail + is no longer present). + """ + + args = job_input.get_arguments() + target_schema = args.get("target_schema") + target_table = args.get("target_table") + trino_queries = TrinoTemplateQueries(job_input) + + trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table) diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/01-test-if-view-matches-target.sql b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/01-test-if-view-matches-target.sql new file mode 100644 index 0000000000..eea2a8effc --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/01-test-if-view-matches-target.sql @@ -0,0 +1,3 @@ +(SELECT * FROM {source_schema}.{source_view} LIMIT 0) +UNION ALL +(SELECT * FROM {target_schema}.{target_table} LIMIT 0) diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/02-drop-tmp-target.sql b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/02-drop-tmp-target.sql new file mode 100644 index 0000000000..fe1d0b3301 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/02-drop-tmp-target.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS {target_schema}.tmp_{target_table} diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/03-drop-backup-target.sql b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/03-drop-backup-target.sql new file mode 100644 index 0000000000..7c260fb5e7 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/03-drop-backup-target.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS {target_schema}.backup_{target_table} diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/04-create-tmp-target.sql b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/04-create-tmp-target.sql new file mode 100644 index 0000000000..827022a50b --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/04-create-tmp-target.sql @@ -0,0 +1,3 @@ +CREATE TABLE {target_schema}.tmp_{target_table}( + LIKE {target_schema}.{target_table} +) diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/05-insert-into-tmp-target.sql b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/05-insert-into-tmp-target.sql new file mode 100644 index 0000000000..cf0d167ab2 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/05-insert-into-tmp-target.sql @@ -0,0 +1,11 @@ +INSERT INTO {target_schema}.tmp_{target_table} +( + SELECT * + FROM {target_schema}.{target_table} + WHERE {last_arrival_ts} < (SELECT MIN({last_arrival_ts}) FROM {source_schema}.{source_view}) +) +UNION ALL +( + SELECT * + FROM {source_schema}.{source_view} +) diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/06-move-data-from-tmp-to-target.py b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/06-move-data-from-tmp-to-target.py new file mode 100644 index 0000000000..4597080f4c --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/templates/load/fact/snapshot/06-move-data-from-tmp-to-target.py @@ -0,0 +1,56 @@ +# Copyright (c) 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging + +from taurus.api.job_input import IJobInput +from taurus.vdk.core import errors +from taurus.vdk.trino_utils import TrinoTemplateQueries + +log = logging.getLogger(__name__) + + +def run(job_input: IJobInput): + """ + In this step we try to move data from tmp_target_table (where we populated the result data in the previous step) + to target table in the following way: + 1. Move data from target_table to a backup table + 2. Try to move data from tmp_target_table to target_table + 3. If 2 fails, try to restore target from backup + 4. If 3 succeeds, drop tmp target. The job fails. + 5. If 3 fails, target table is lost, its content are in the backup table. Next job retry will try to + recover target on its first step. + 6. If 2 succeeds, drop backup, we are ready. + + Note: If there is no data in tmp_target_table, we are sure that the source table provided initially was empty, + so we do nothing, target remains unchanged and we drop the empty tmp_target_table. + """ + + args = job_input.get_arguments() + target_schema = args.get("target_schema") + source_view = args.get("source_view") + target_table = args.get("target_table") + tmp_target_table = "tmp_" + target_table + trino_queries = TrinoTemplateQueries(job_input) + + log.debug("Check if tmp target has data.") + res = job_input.execute_query( + f""" + SELECT COUNT(*) FROM {target_schema}.{tmp_target_table} + """ + ) + if res and res[0][0] > 0: + log.debug( + "Confirmed that tmp target has data, proceed with moving it to target." + ) + trino_queries.perform_safe_move_data_to_table_step( + from_db=target_schema, + from_table_name=tmp_target_table, + to_db=target_schema, + to_table_name=target_table, + ) + else: + log.info( + f"Target table {target_schema}.{target_table} remains unchanged " + f"because source table {target_schema}.{source_view} was empty." + ) + trino_queries.drop_table(target_schema, tmp_target_table) diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/trino_plugin.py b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/trino_plugin.py index 1682fe4678..43b184c2c9 100644 --- a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/trino_plugin.py +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/trino_plugin.py @@ -116,6 +116,11 @@ def initialize_job(context: JobContext) -> None: "scd2", pathlib.Path(get_job_path("load/dimension/scd2")) ) + # TODO: revise template name, what this template does currently is not snapshot (maybe rename to append-overwrite) + context.templates.add_template( + "snapshot", pathlib.Path(get_job_path("load/fact/snapshot")) + ) + @hookimpl(hookwrapper=True, trylast=True) def run_step(context: JobContext, step: Step) -> None: diff --git a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/trino_utils.py b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/trino_utils.py index 1f4e14795e..3ce0799064 100644 --- a/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/trino_utils.py +++ b/projects/vdk-core/plugins/vdk-trino/src/taurus/vdk/trino_utils.py @@ -10,7 +10,7 @@ log = logging.getLogger(__name__) -class TrinoQueries: +class TrinoTemplateQueries: """ Allows to execute queries against Trino for a concrete job_input more easily. Provides a table_exists method (a command that Trino does not support). @@ -99,9 +99,134 @@ def drop_table(self, db: str, table_name: str): """ ) + def ensure_target_exists_step(self, db: str, target_name: str): + """ + This method checks if target exists. If it does not, an attempt to recover it from backup is initiated. + If there is no valid target at the end, error is raised. + :param db: Schema of the target table + :param target_name: Name of the target table + :return: None + """ + backup_name = self.__get_backup_table_name(target_name) + if not self.table_exists(db, target_name): + log.debug("If there is backup, try to recover target from it") + if self.table_exists(db, backup_name): + log.debug("Try to recover target from backup") + try: + self.move_data_to_table(db, backup_name, db, target_name) + log.info( + f"""Successfully recovered {db}.{target_name} from {db}.{backup_name}""" + ) + except Exception as e: + errors.log_and_throw( + to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR, + log=log, + what_happened=f"""Target table is unexistent and recovering it from backup table failed with " + "exception: {e}""", + why_it_happened=f"""One of the previous job retries failed after dropping " + "{db}.{target_name} and before moving data to it.""", + consequences="Current Step (python file) will fail, and as a result the whole Data Job will fail.", + countermeasures=f"""You could try to recover {db}.{target_name} from" + "{db}.{backup_name} by hand and then rerun the job.""", + ) + + # if there is no target and no backup, the user provided invalid target table + else: + errors.log_and_throw( + to_be_fixed_by=errors.ResolvableBy.USER_ERROR, + log=log, + what_happened="Cannot find target table", + why_it_happened=f"Template is called for unexistent target table: {db}.{target_table}", + consequences="Current Step (python file) will fail, and as a result the whole Data Job will fail.", + countermeasures="Provide valid target table arguments.", + ) + + def perform_safe_move_data_to_table_step( + self, + from_db: str, + from_table_name: str, + to_db: str, + to_table_name: str, + ): + """ + This method creates a backup table of the target, then tries to move data from source to target. + Source data is deleted in this process except in the situation when target data is lost. + If moving data fails, an attemt to recover target from backup is initiated. + :param from_db: Schema of the table that we want to rename + :param from_table_name: Name of the table we want to rename + :param to_db: Schema of the new table we want + :param to_table_name: Name of the new table + :return: None + """ + log.debug("Create backup from target") + backup_table_name = self.__get_backup_table_name(to_table_name) + self.move_data_to_table( + from_db=to_db, + from_table_name=to_table_name, + to_db=to_db, + to_table_name=backup_table_name, + ) + try: + log.debug("Create target from tmp target") + result = self.move_data_to_table( + from_db=from_db, + from_table_name=from_table_name, + to_db=to_db, + to_table_name=to_table_name, + ) + except Exception as e: + result = None + if self.__try_recover_target_from_backup( + to_db, to_table_name, backup_table_name + ): + self.drop_table(from_db, from_table_name) + raise + else: + errors.log_and_throw( + to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR, + log=log, + what_happened=f"""Recovering target from backup table failed. " + "Table {to_db}.{to_table_name} is lost!""", + why_it_happened=f"""Step with moving data from source to target table failed, so recovery from " + "backup was initiated, but it also failed with error: {e}""", + consequences="Current Step (python file) will fail, and as a result the whole Data Job will fail.", + countermeasures=f"""Please, try the steps bellow in the following order:\n" + "1. Try to rerun the data job OR\n" + "2. First try to recover {to_db}.{to_table_name} from" + "{to_db}.{backup_table_name} by manually executing:\n" + "CREATE TABLE {to_db}.{to_table_name} (LIKE {to_db}.{backup_table_name})\n" + "INSERT INTO {to_db}.{to_table_name} SELECT * FROM {to_db}.{backup_table_name}\n" + "Then try to rerun the data job OR\n" + "3. Report the issue to support team.""", + ) + if result: + log.debug("Target table was successfully created, and we can drop backup") + self.drop_table(to_db, backup_table_name) + + def __try_recover_target_from_backup( + self, db: str, target_table: str, backup_table: str + ): + log.debug("Try to recover target from backup") + try: + result = self.move_data_to_table( + from_db=db, + from_table_name=backup_table, + to_db=db, + to_table_name=target_table, + ) + except Exception as e: + result = None + pass + + return result + @staticmethod def __is_table_not_found_error(exception): return ( isinstance(exception, TrinoUserError) and exception.error_name == "TABLE_NOT_FOUND" ) + + @staticmethod + def __get_backup_table_name(table_name): + return "backup_" + table_name diff --git a/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_dimension_scd2_template_job/01_prepare_input_data.py b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_dimension_scd2_template_job/01_prepare_input_data.py index 2ef1aff4dd..a4626fdabe 100644 --- a/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_dimension_scd2_template_job/01_prepare_input_data.py +++ b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_dimension_scd2_template_job/01_prepare_input_data.py @@ -4,7 +4,7 @@ Load example input data for an scd2 template test. """ from taurus.api.job_input import IJobInput -from taurus.vdk.trino_utils import TrinoQueries +from taurus.vdk.trino_utils import TrinoTemplateQueries def run(job_input: IJobInput) -> None: @@ -130,7 +130,7 @@ def run(job_input: IJobInput) -> None: target_schema = args.get("target_schema") target_table = args.get("target_table") - trino_queries = TrinoQueries(job_input) + trino_queries = TrinoTemplateQueries(job_input) trino_queries.move_data_to_table( from_db=target_schema, from_table_name=target_table, diff --git a/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job/01_prepare_input_data.py b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job/01_prepare_input_data.py new file mode 100644 index 0000000000..f84e89aad1 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job/01_prepare_input_data.py @@ -0,0 +1,149 @@ +# Copyright (c) 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Load example input data for a snapshot template test. +""" +from taurus.api.job_input import IJobInput +from taurus.vdk.trino_utils import TrinoTemplateQueries + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + job_input.execute_query( + """ + DROP TABLE IF EXISTS {target_schema}.{target_table} + """ + ) + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} ( + dim_sddc_sk VARCHAR, + dim_org_id INT, + dim_date_id TIMESTAMP, + host_count BIGINT, + cluster_count BIGINT, + {last_arrival_ts} TIMESTAMP + ) + """ + ) + job_input.execute_query( + """ + INSERT INTO {target_schema}.{target_table} VALUES + -- 2019-11-18 + ('sddc01-r01', 1, TIMESTAMP '2019-11-18', 5 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc02-r01', 2, TIMESTAMP '2019-11-18', 4 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-18', 12, 3, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-18', 4 , 1, TIMESTAMP '2019-11-18 09:00:00'), + -- 2019-11-19 + ('sddc01-r01', 1, TIMESTAMP '2019-11-19', 5 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc02-r01', 2, TIMESTAMP '2019-11-19', 4 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-19', 13, 3, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-19', 3 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc05-r02', 5, TIMESTAMP '2019-11-19', 20, 4, TIMESTAMP '2019-11-19 09:00:00') + """ + ) + + # Step 2: create a table that represents the next snapshot + + job_input.execute_query( + """ + DROP TABLE IF EXISTS {source_schema}.{source_view} + """ + ) + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS {source_schema}.{source_view} ( + dim_sddc_sk VARCHAR, + dim_org_id INT, + dim_date_id TIMESTAMP, + host_count BIGINT, + cluster_count BIGINT, + {last_arrival_ts} TIMESTAMP + ) + """ + ) + job_input.execute_query( + """ + INSERT INTO {source_schema}.{source_view} VALUES + -- 2019-11-18 + ('sddc05-r01', 5, TIMESTAMP '2019-11-18', 18, 4, TIMESTAMP '2019-11-18 09:30:00'), -- late arrival + -- 2019-11-19 (duplicated) + ('sddc01-r01', 1, TIMESTAMP '2019-11-19', 5 , 1, TIMESTAMP '2019-11-19 09:00:00'), -- duplicated + ('sddc02-r01', 2, TIMESTAMP '2019-11-19', 4 , 1, TIMESTAMP '2019-11-19 09:00:00'), -- duplicated + ('sddc03-r01', 3, TIMESTAMP '2019-11-19', 13, 3, TIMESTAMP '2019-11-19 09:00:00'), -- duplicated + ('sddc04-r01', 4, TIMESTAMP '2019-11-19', 3 , 1, TIMESTAMP '2019-11-19 09:00:00'), -- duplicated + ('sddc05-r02', 5, TIMESTAMP '2019-11-19', 20, 5, TIMESTAMP '2019-11-19 09:00:00'), -- changed + -- 2019-11-20 + ('sddc01-r01', 1, TIMESTAMP '2019-11-20', 10, 2, TIMESTAMP '2019-11-20 09:00:00'), -- new + ('sddc02-r02', 2, TIMESTAMP '2019-11-20', 7 , 1, TIMESTAMP '2019-11-20 09:00:00'), -- new + ('sddc03-r01', 3, TIMESTAMP '2019-11-20', 13, 3, TIMESTAMP '2019-11-20 09:00:00'), -- new + ('sddc04-r01', 4, TIMESTAMP '2019-11-20', 3 , 1, TIMESTAMP '2019-11-20 09:00:00'), -- new + ('sddc05-r04', 5, TIMESTAMP '2019-11-20', 3 , 1, TIMESTAMP '2019-11-20 09:00:00'), -- new + ('sddc06-r01', 1, TIMESTAMP '2019-11-20', 3 , 1, TIMESTAMP '2019-11-20 09:00:00') -- new + """ + ) + + # Step 3: Create a table containing the state expected after updating the current state with the next snapshot + + job_input.execute_query( + """ + DROP TABLE IF EXISTS {expect_schema}.{expect_table} + """ + ) + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS {expect_schema}.{expect_table} ( + dim_sddc_sk VARCHAR, + dim_org_id INT, + dim_date_id TIMESTAMP, + host_count BIGINT, + cluster_count BIGINT, + {last_arrival_ts} TIMESTAMP + ) + """ + ) + job_input.execute_query( + """ + INSERT INTO {expect_schema}.{expect_table} VALUES + -- 2019-11-18 + ('sddc01-r01', 1, TIMESTAMP '2019-11-18', 5 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc02-r01', 2, TIMESTAMP '2019-11-18', 4 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-18', 12, 3, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-18', 4 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc05-r01', 5, TIMESTAMP '2019-11-18', 18, 4, TIMESTAMP '2019-11-18 09:30:00'), + -- 2019-11-19 (duplicated) + ('sddc01-r01', 1, TIMESTAMP '2019-11-19', 5 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc02-r01', 2, TIMESTAMP '2019-11-19', 4 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-19', 13, 3, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-19', 3 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc05-r02', 5, TIMESTAMP '2019-11-19', 20, 5, TIMESTAMP '2019-11-19 09:00:00'), + -- 2019-11-20 + ('sddc01-r01', 1, TIMESTAMP '2019-11-20', 10, 2, TIMESTAMP '2019-11-20 09:00:00'), + ('sddc02-r02', 2, TIMESTAMP '2019-11-20', 7 , 1, TIMESTAMP '2019-11-20 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-20', 13, 3, TIMESTAMP '2019-11-20 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-20', 3 , 1, TIMESTAMP '2019-11-20 09:00:00'), + ('sddc05-r04', 5, TIMESTAMP '2019-11-20', 3 , 1, TIMESTAMP '2019-11-20 09:00:00'), + ('sddc06-r01', 1, TIMESTAMP '2019-11-20', 3 , 1, TIMESTAMP '2019-11-20 09:00:00') + """ + ) + + # Step 4: Change target to backup, so that restoring from backup process would be triggered + + args = job_input.get_arguments() + if args.get("test_restore_from_backup") == "True": + job_input.execute_query( + """ + DROP TABLE IF EXISTS {target_schema}.backup_{target_table} + """ + ) + + target_schema = args.get("target_schema") + target_table = args.get("target_table") + trino_queries = TrinoTemplateQueries(job_input) + trino_queries.move_data_to_table( + from_db=target_schema, + from_table_name=target_table, + to_db=target_schema, + to_table_name="backup_" + target_table, + ) diff --git a/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job/02_run_load_fact_snapshot_template.py b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job/02_run_load_fact_snapshot_template.py new file mode 100644 index 0000000000..1513938527 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job/02_run_load_fact_snapshot_template.py @@ -0,0 +1,12 @@ +# Copyright (c) 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from taurus.api.job_input import IJobInput + + +def run(job_input: IJobInput) -> None: + result = job_input.execute_template( + template_name="snapshot", + template_args=job_input.get_arguments(), + ) + if result.is_failed() and result.get_exception(): + raise result.get_exception() diff --git a/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job_empty_source/01_prepare_input_data.py b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job_empty_source/01_prepare_input_data.py new file mode 100644 index 0000000000..c0601389ae --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job_empty_source/01_prepare_input_data.py @@ -0,0 +1,100 @@ +# Copyright (c) 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Load example input data for a snapshot template test with empty source. +""" +from taurus.api.job_input import IJobInput + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + job_input.execute_query( + """ + DROP TABLE IF EXISTS {target_schema}.{target_table} + """ + ) + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} ( + dim_sddc_sk VARCHAR, + dim_org_id INT, + dim_date_id TIMESTAMP, + host_count BIGINT, + cluster_count BIGINT, + {last_arrival_ts} TIMESTAMP + ) + """ + ) + job_input.execute_query( + """ + INSERT INTO {target_schema}.{target_table} VALUES + -- 2019-11-18 + ('sddc01-r01', 1, TIMESTAMP '2019-11-18', 5 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc02-r01', 2, TIMESTAMP '2019-11-18', 4 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-18', 12, 3, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-18', 4 , 1, TIMESTAMP '2019-11-18 09:00:00'), + -- 2019-11-19 + ('sddc01-r01', 1, TIMESTAMP '2019-11-19', 5 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc02-r01', 2, TIMESTAMP '2019-11-19', 4 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-19', 13, 3, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-19', 3 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc05-r02', 5, TIMESTAMP '2019-11-19', 20, 4, TIMESTAMP '2019-11-19 09:00:00') + """ + ) + + # Step 2: create a table that represents the next snapshot, empty in this case + + job_input.execute_query( + """ + DROP TABLE IF EXISTS {source_schema}.{source_view} + """ + ) + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS {source_schema}.{source_view} ( + dim_sddc_sk VARCHAR, + dim_org_id INT, + dim_date_id TIMESTAMP, + host_count BIGINT, + cluster_count BIGINT, + {last_arrival_ts} TIMESTAMP + ) + """ + ) + + # Step 3: Create a table containing the state expected after updating the current state with the next snapshot + + job_input.execute_query( + """ + DROP TABLE IF EXISTS {expect_schema}.{expect_table} + """ + ) + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS {expect_schema}.{expect_table} ( + dim_sddc_sk VARCHAR, + dim_org_id INT, + dim_date_id TIMESTAMP, + host_count BIGINT, + cluster_count BIGINT, + {last_arrival_ts} TIMESTAMP + ) + """ + ) + job_input.execute_query( + """ + INSERT INTO {expect_schema}.{expect_table} VALUES + -- 2019-11-18 + ('sddc01-r01', 1, TIMESTAMP '2019-11-18', 5 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc02-r01', 2, TIMESTAMP '2019-11-18', 4 , 1, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-18', 12, 3, TIMESTAMP '2019-11-18 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-18', 4 , 1, TIMESTAMP '2019-11-18 09:00:00'), + -- 2019-11-19 + ('sddc01-r01', 1, TIMESTAMP '2019-11-19', 5 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc02-r01', 2, TIMESTAMP '2019-11-19', 4 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc03-r01', 3, TIMESTAMP '2019-11-19', 13, 3, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc04-r01', 4, TIMESTAMP '2019-11-19', 3 , 1, TIMESTAMP '2019-11-19 09:00:00'), + ('sddc05-r02', 5, TIMESTAMP '2019-11-19', 20, 4, TIMESTAMP '2019-11-19 09:00:00') + """ + ) diff --git a/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job_empty_source/02_run_load_fact_snapshot_template.py b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job_empty_source/02_run_load_fact_snapshot_template.py new file mode 100644 index 0000000000..1513938527 --- /dev/null +++ b/projects/vdk-core/plugins/vdk-trino/tests/jobs/load_fact_snapshot_template_job_empty_source/02_run_load_fact_snapshot_template.py @@ -0,0 +1,12 @@ +# Copyright (c) 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from taurus.api.job_input import IJobInput + + +def run(job_input: IJobInput) -> None: + result = job_input.execute_template( + template_name="snapshot", + template_args=job_input.get_arguments(), + ) + if result.is_failed() and result.get_exception(): + raise result.get_exception() diff --git a/projects/vdk-core/plugins/vdk-trino/tests/jobs/test_move_data_strategy_job/02_move_data_to_target.py b/projects/vdk-core/plugins/vdk-trino/tests/jobs/test_move_data_strategy_job/02_move_data_to_target.py index dddfb09665..670dc54d0b 100644 --- a/projects/vdk-core/plugins/vdk-trino/tests/jobs/test_move_data_strategy_job/02_move_data_to_target.py +++ b/projects/vdk-core/plugins/vdk-trino/tests/jobs/test_move_data_strategy_job/02_move_data_to_target.py @@ -1,7 +1,7 @@ # Copyright (c) 2021 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 from taurus.api.job_input import IJobInput -from taurus.vdk.trino_utils import TrinoQueries +from taurus.vdk.trino_utils import TrinoTemplateQueries def run(job_input: IJobInput) -> None: @@ -13,5 +13,5 @@ def run(job_input: IJobInput) -> None: src = args.get("src") target = args.get("target") - trino_queries = TrinoQueries(job_input) + trino_queries = TrinoTemplateQueries(job_input) trino_queries.move_data_to_table(db, src, db, target) diff --git a/projects/vdk-core/plugins/vdk-trino/tests/test_vdk_templates.py b/projects/vdk-core/plugins/vdk-trino/tests/test_vdk_templates.py index 179f42d38a..5c717e3eda 100644 --- a/projects/vdk-core/plugins/vdk-trino/tests/test_vdk_templates.py +++ b/projects/vdk-core/plugins/vdk-trino/tests/test_vdk_templates.py @@ -11,7 +11,7 @@ from taurus.vdk.test_utils.util_funcs import cli_assert_equal from taurus.vdk.test_utils.util_funcs import CliEntryBasedTestRunner from taurus.vdk.test_utils.util_funcs import get_test_job_path -from taurus.vdk.trino_utils import TrinoQueries +from taurus.vdk.trino_utils import TrinoTemplateQueries VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE" VDK_TRINO_PORT = "VDK_TRINO_PORT" @@ -20,7 +20,7 @@ "VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY" ) -org_move_data_to_table = TrinoQueries.move_data_to_table +org_move_data_to_table = TrinoTemplateQueries.move_data_to_table def trino_move_data_to_table_break_tmp_to_target( @@ -122,14 +122,18 @@ def test_scd2_template_restore_target_from_backup_on_start(self) -> None: expect_table = "ex_scmdb_people" result: Result = self.__scd2_template_execute( - test_schema, source_view, target_table, expect_table, "restore_from_backup" + test_schema, source_view, target_table, expect_table, True ) cli_assert_equal(0, result) + assert ( + f"Successfully recovered {test_schema}.{target_table}" in result.output + ), "Missing log for recovering target schema." + self.__scd2_template_check_expected_res(test_schema, target_table, expect_table) @mock.patch.object( - TrinoQueries, + TrinoTemplateQueries, "move_data_to_table", new=trino_move_data_to_table_break_tmp_to_target, ) @@ -148,7 +152,7 @@ def test_scd2_template_fail_last_step_and_restore_target(self): cli_assert_equal(0, self.__template_table_exists(test_schema, target_table)) @mock.patch.object( - TrinoQueries, + TrinoTemplateQueries, "move_data_to_table", new=trino_move_data_to_table_break_tmp_to_target_and_restore, ) @@ -170,6 +174,176 @@ def test_scd2_template_fail_last_step_and_fail_restore_target(self): f"Table {test_schema}.{target_table} is lost!" in result.output ), "Missing log for losing target schema." + def test_fact_snapshot_template(self) -> None: + test_schema = "default" + source_view = "vw_fact_sddc_daily" + target_table = "dw_fact_sddc_daily" + expect_table = "ex_fact_sddc_daily" + + result: Result = self.__fact_snapshot_template_execute( + test_schema, source_view, target_table, expect_table + ) + cli_assert_equal(0, result) + + self.__fact_snapshot_template_check_expected_res( + test_schema, target_table, expect_table + ) + + def test_fact_snapshot_empty_source(self) -> None: + test_schema = "default" + source_view = "vw_fact_sddc_daily" + target_table = "dw_fact_sddc_daily" + expect_table = "ex_fact_sddc_daily" + + result: Result = self.__runner.invoke( + [ + "run", + get_test_job_path( + pathlib.Path(os.path.dirname(os.path.abspath(__file__))), + "load_fact_snapshot_template_job_empty_source", + ), + "--arguments", + json.dumps( + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "last_arrival_ts": "updated_at", + } + ), + ] + ) + cli_assert_equal(0, result) + + assert ( + f"Target table {test_schema}.{target_table} remains unchanged" + in result.output + ), "Cannot find log about empty source in output." + + self.__fact_snapshot_template_check_expected_res( + test_schema, target_table, expect_table + ) + + def test_fact_snapshot_template_restore_target_from_backup_on_start(self) -> None: + test_schema = "default" + source_view = "vw_scmdb_people" + target_table = "dw_scmdb_people" + expect_table = "ex_scmdb_people" + + result: Result = self.__fact_snapshot_template_execute( + test_schema, source_view, target_table, expect_table, True + ) + cli_assert_equal(0, result) + + assert ( + f"Successfully recovered {test_schema}.{target_table}" in result.output + ), "Missing log for recovering target schema." + + self.__fact_snapshot_template_check_expected_res( + test_schema, target_table, expect_table + ) + + @mock.patch.object( + TrinoTemplateQueries, + "move_data_to_table", + new=trino_move_data_to_table_break_tmp_to_target, + ) + def test_fact_snapshot_template_fail_last_step_and_restore_target(self): + test_schema = "default" + source_view = "vw_scmdb_people" + target_table = "dw_scmdb_people" + expect_table = "ex_scmdb_people" + + result: Result = self.__fact_snapshot_template_execute( + test_schema, source_view, target_table, expect_table + ) + + # Check if template fails but target is successfully restored + cli_assert_equal(1, result) + cli_assert_equal(0, self.__template_table_exists(test_schema, target_table)) + + @mock.patch.object( + TrinoTemplateQueries, + "move_data_to_table", + new=trino_move_data_to_table_break_tmp_to_target_and_restore, + ) + def test_fact_snapshot_template_fail_last_step_and_fail_restore_target(self): + test_schema = "default" + source_view = "vw_scmdb_people" + target_table = "dw_scmdb_people" + expect_table = "ex_scmdb_people" + + result: Result = self.__fact_snapshot_template_execute( + test_schema, source_view, target_table, expect_table + ) + + # Check if template fails and target fails to be restored + cli_assert_equal(1, result) + cli_assert_equal(1, self.__template_table_exists(test_schema, target_table)) + + assert ( + f"Table {test_schema}.{target_table} is lost!" in result.output + ), "Missing log for losing target schema." + + def __fact_snapshot_template_execute( + self, + test_schema, + source_view, + target_table, + expect_table, + restore_from_backup=False, + ): + return self.__runner.invoke( + [ + "run", + get_test_job_path( + pathlib.Path(os.path.dirname(os.path.abspath(__file__))), + "load_fact_snapshot_template_job", + ), + "--arguments", + json.dumps( + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "last_arrival_ts": "updated_at", + "test_restore_from_backup": f"{restore_from_backup}", + } + ), + ] + ) + + def __fact_snapshot_template_check_expected_res( + self, test_schema, target_table, expect_table + ) -> None: + actual_rs: Result = self.__runner.invoke( + [ + "trino-query", + "--query", + f"SELECT * FROM {test_schema}.{target_table} ORDER BY dim_date_id, dim_sddc_sk", + ] + ) + + expected_rs: Result = self.__runner.invoke( + [ + "trino-query", + "--query", + f"SELECT * FROM {test_schema}.{expect_table} ORDER BY dim_date_id, dim_sddc_sk", + ] + ) + + cli_assert_equal(0, actual_rs) + cli_assert_equal(0, expected_rs) + assert ( + actual_rs.output == expected_rs.output + ), f"Elements in {target_table} and {expect_table} differ." + def __scd2_template_execute( self, test_schema, diff --git a/projects/vdk-core/plugins/vdk-trino/tests/test_vdk_trino_utils.py b/projects/vdk-core/plugins/vdk-trino/tests/test_vdk_trino_utils.py index 154237dfbd..772af0eb5c 100644 --- a/projects/vdk-core/plugins/vdk-trino/tests/test_vdk_trino_utils.py +++ b/projects/vdk-core/plugins/vdk-trino/tests/test_vdk_trino_utils.py @@ -12,7 +12,7 @@ from taurus.vdk.test_utils.util_funcs import cli_assert_equal from taurus.vdk.test_utils.util_funcs import CliEntryBasedTestRunner from taurus.vdk.test_utils.util_funcs import get_test_job_path -from taurus.vdk.trino_utils import TrinoQueries +from taurus.vdk.trino_utils import TrinoTemplateQueries VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE" VDK_TRINO_PORT = "VDK_TRINO_PORT" @@ -33,7 +33,7 @@ def setUp(self) -> None: def test_move_data_to_table_insert_select_strategy(self) -> None: with mock.patch.object( - TrinoQueries, + TrinoTemplateQueries, "get_move_data_to_table_strategy", return_value="INSERT_SELECT", ) as patched: @@ -41,13 +41,15 @@ def test_move_data_to_table_insert_select_strategy(self) -> None: def test_move_data_to_table_rename_strategy(self) -> None: with mock.patch.object( - TrinoQueries, "get_move_data_to_table_strategy", return_value="RENAME" + TrinoTemplateQueries, + "get_move_data_to_table_strategy", + return_value="RENAME", ) as patched: self.check_move_data_to_table_for_current_strategy() def test_move_data_to_table_invalid_strategy(self) -> None: with mock.patch.object( - TrinoQueries, + TrinoTemplateQueries, "get_move_data_to_table_strategy", return_value="INVALID_STRATEGY", ) as patched: