Skip to content

Commit

Permalink
vdk-trino: Address snapshot (append_overwrite) template feedback
Browse files Browse the repository at this point in the history
Since the template is not a classic snapshot implementation, its
documentation and name need to be changed. There is no need for the
backup table's name to be a parameter.

Remove incorrect README.md, it will be revised and added in a different
commit. Add TODO for renaming the template.
Add method for getting backup table name.

Templates unit tests.

The commit is a non-breaking change which does minor refactoring.

Signed-off-by: Yana Zhivkova ([email protected])
  • Loading branch information
YanaZhivkova committed Aug 17, 2021
1 parent d9dc46d commit 900f3dd
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 64 deletions.
1 change: 1 addition & 0 deletions projects/vdk-core/plugins/vdk-trino/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include src/taurus/vdk/templates/load/dimension/**/*
include src/taurus/vdk/templates/load/fact/**/*

# this only work for source distribution (setup.py sdist) and not for binary distribution (bdist)
# https://packaging.python.org/guides/using-manifest-in/
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@ def run(job_input: IJobInput):
In this step we try to recover potentially unexistent target table from backup.
In some cases the template might fail during the step where new data is written in target table
(last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and
target table is no longer present. Fortunately it has a backup (backup_target_table).
target table is no longer present. Fortunately it has a backup.
So when the job is retried, this first step should recover the target (if the reason for the previous fail
is no longer present).
"""

args = job_input.get_arguments()
target_schema = args.get("target_schema")
target_table = args.get("target_table")
backup_target_table = "backup_" + target_table
trino_queries = TrinoTemplateQueries(job_input)

trino_queries.ensure_target_exists_step(
db=target_schema, target_name=target_table, backup_name=backup_target_table
)
trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table)
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ def run(job_input: IJobInput):
"""
In this step we try to move data from tmp_target_table (where we populated the result data in the previous step)
to target table in the following way:
1. Move data from target_table to backup_target_table
1. Move data from target_table to a backup table
2. Try to move data from tmp_target_table to target_table
3. If 2 fails, try to restore target from backup
4. If 3 succeeds, drop tmp target. The job fails.
5. If 3 fails, target table is lost, its content are in backup_target_table. Next job retry will try to
5. If 3 fails, target table is lost, its content are in the backup table. Next job retry will try to
recover target on its first step.
6. If 2 succeeds, drop backup, we are ready.
"""
Expand All @@ -26,13 +26,11 @@ def run(job_input: IJobInput):
target_schema = args.get("target_schema")
target_table = args.get("target_table")
tmp_target_table = "tmp_" + target_table
backup_target_table = "backup_" + target_table
trino_queries = TrinoTemplateQueries(job_input)

trino_queries.perform_safe_move_data_to_table_step(
from_db=target_schema,
from_table_name=tmp_target_table,
to_db=target_schema,
to_table_name=target_table,
backup_table_name=backup_target_table,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@ def run(job_input: IJobInput):
In this step we try to recover potentially unexistent target table from backup.
In some cases the template might fail during the step where new data is written in target table
(last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and
target table is no longer present. Fortunately it has a backup (backup_target_table).
target table is no longer present. Fortunately it has a backup.
So when the job is retried, this first step should recover the target (if the reason for the previous fail
is no longer present).
"""

args = job_input.get_arguments()
target_schema = args.get("target_schema")
target_table = args.get("target_table")
backup_target_table = "backup_" + target_table
trino_queries = TrinoTemplateQueries(job_input)

trino_queries.ensure_target_exists_step(
db=target_schema, target_name=target_table, backup_name=backup_target_table
)
trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table)
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@ def run(job_input: IJobInput):
"""
In this step we try to move data from tmp_target_table (where we populated the result data in the previous step)
to target table in the following way:
1. Move data from target_table to backup_target_table
1. Move data from target_table to a backup table
2. Try to move data from tmp_target_table to target_table
3. If 2 fails, try to restore target from backup
4. If 3 succeeds, drop tmp target. The job fails.
5. If 3 fails, target table is lost, its content are in backup_target_table. Next job retry will try to
5. If 3 fails, target table is lost, its content are in the backup table. Next job retry will try to
recover target on its first step.
6. If 2 succeeds, drop backup, we are ready.
Note: If there is no data in tmp_target_table, we are sure that the source table provided initially was empty,
so we do nothing, target (snapshot) remains unchanged and we drop the empty tmp_target_table.
so we do nothing, target remains unchanged and we drop the empty tmp_target_table.
"""

args = job_input.get_arguments()
target_schema = args.get("target_schema")
source_view = args.get("source_view")
target_table = args.get("target_table")
tmp_target_table = "tmp_" + target_table
backup_target_table = "backup_" + target_table
trino_queries = TrinoTemplateQueries(job_input)

log.debug("Check if tmp target has data.")
Expand All @@ -48,7 +47,6 @@ def run(job_input: IJobInput):
from_table_name=tmp_target_table,
to_db=target_schema,
to_table_name=target_table,
backup_table_name=backup_target_table,
)
else:
log.info(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def initialize_job(context: JobContext) -> None:
"scd2", pathlib.Path(get_job_path("load/dimension/scd2"))
)

# TODO: revise template name, what this template does currently is not snapshot (maybe rename to append-overwrite)
context.templates.add_template(
"snapshot", pathlib.Path(get_job_path("load/fact/snapshot"))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ def drop_table(self, db: str, table_name: str):
"""
)

def ensure_target_exists_step(self, db: str, target_name: str, backup_name: str):
def ensure_target_exists_step(self, db: str, target_name: str):
"""
This method checks if target exists. If it does not, an attempt to recover it from backup is initiated.
If there is no valid target at the end, error is raised.
:param db: Schema of the target table
:param target_name: Name of the target table
:param backup_name: Name of the backup from which we might try to recover target (backup may not exists)
:return: None
"""
backup_name = self.__get_backup_table_name(target_name)
if not self.table_exists(db, target_name):
log.debug("If there is backup, try to recover target from it")
if self.table_exists(db, backup_name):
Expand Down Expand Up @@ -147,7 +147,6 @@ def perform_safe_move_data_to_table_step(
from_table_name: str,
to_db: str,
to_table_name: str,
backup_table_name: str,
):
"""
This method creates a backup table of the target, then tries to move data from source to target.
Expand All @@ -157,10 +156,10 @@ def perform_safe_move_data_to_table_step(
:param from_table_name: Name of the table we want to rename
:param to_db: Schema of the new table we want
:param to_table_name: Name of the new table
:param backup_table_name: Name of the backup table we create
:return: None
"""
log.debug("Create backup from target")
backup_table_name = self.__get_backup_table_name(to_table_name)
self.move_data_to_table(
from_db=to_db,
from_table_name=to_table_name,
Expand Down Expand Up @@ -227,3 +226,7 @@ def __is_table_not_found_error(exception):
isinstance(exception, TrinoUserError)
and exception.error_name == "TABLE_NOT_FOUND"
)

@staticmethod
def __get_backup_table_name(table_name):
return "backup_" + table_name

0 comments on commit 900f3dd

Please sign in to comment.