Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-trino: Add fact snapshot template #84

Merged
merged 4 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions projects/vdk-core/plugins/vdk-trino/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include src/taurus/vdk/templates/load/dimension/**/*
include src/taurus/vdk/templates/load/fact/**/*

# this only work for source distribution (setup.py sdist) and not for binary distribution (bdist)
# https://packaging.python.org/guides/using-manifest-in/
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,24 @@

from taurus.api.job_input import IJobInput
from taurus.vdk.core import errors
from taurus.vdk.trino_utils import TrinoQueries
from taurus.vdk.trino_utils import TrinoTemplateQueries

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
"""
In this step we try to recover unexistent target table from backup.
In this step we try to recover potentially unexistent target table from backup.
In some cases the template might fail during the step where new data is written in target table
(last step where tmp_target_table is renamed to target_table). If this happens, the job fails and
target table is no longer present. Fortunately it has a backup (backup_target_table).
(last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and
target table is no longer present. Fortunately it has a backup.
So when the job is retried, this first step should recover the target (if the reason for the previous fail
is no longer present).
"""

args = job_input.get_arguments()
target_schema = args.get("target_schema")
target_table = args.get("target_table")
tmp_target_table = "tmp_" + target_table
backup_target_table = "backup_" + target_table
trino_queries = TrinoTemplateQueries(job_input)

trino_queries = TrinoQueries(job_input)

if not trino_queries.table_exists(target_schema, target_table):
log.debug("If there is backup, try to recover target from it")
if trino_queries.table_exists(target_schema, backup_target_table):
log.debug("Try to recover target from backup")
try:
trino_queries.move_data_to_table(
target_schema, backup_target_table, target_schema, target_table
)
log.info(
f"""Successfully recovered {target_schema}.{target_table} from "
"{target_schema}.{backup_target_table}"""
)
except Exception as e:
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR,
log=log,
what_happened=f"Target table is unexistent and recovering it from backup table failed with "
f"exception: {e}",
why_it_happened=f"""One of the previous job retries failed after dropping "
"{target_schema}.{target_table} and before renaming "
"{target_schema}.{tmp_target_table} to "
"{target_schema}.{target_table}.""",
consequences="Current Step (python file) will fail, and as a result the whole Data Job will fail.",
countermeasures="You could try to recover {target_schema}.{target_table} from"
"{target_schema}.{backup_target_table} by hand and then "
"rerun the job."
"",
)

# if there is no target and no backup, the user provided invalid target table
# TODO: create target table automatically if not provided by user
else:
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
log=log,
what_happened="Cannot find target table",
why_it_happened=f"Template is called for unexistent target table: {target_schema}.{target_table}",
consequences="Current Step (python file) will fail, and as a result the whole Data Job will fail.",
countermeasures="Provide valid target table arguments.",
)
trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from taurus.api.job_input import IJobInput
from taurus.vdk.core import errors
from taurus.vdk.trino_utils import TrinoTemplateQueries

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
"""
In this step we try to move data from tmp_target_table (where we populated the result data in the previous step)
to target table in the following way:
1. Move data from target_table to a backup table
2. Try to move data from tmp_target_table to target_table
3. If 2 fails, try to restore target from backup
4. If 3 succeeds, drop tmp target. The job fails.
5. If 3 fails, target table is lost, its content are in the backup table. Next job retry will try to
recover target on its first step.
6. If 2 succeeds, drop backup, we are ready.
"""

args = job_input.get_arguments()
target_schema = args.get("target_schema")
target_table = args.get("target_table")
tmp_target_table = "tmp_" + target_table
trino_queries = TrinoTemplateQueries(job_input)

trino_queries.perform_safe_move_data_to_table_step(
from_db=target_schema,
from_table_name=tmp_target_table,
to_db=target_schema,
to_table_name=target_table,
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from taurus.api.job_input import IJobInput
from taurus.vdk.core import errors
from taurus.vdk.trino_utils import TrinoTemplateQueries

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
"""
In this step we try to recover potentially unexistent target table from backup.
In some cases the template might fail during the step where new data is written in target table
(last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and
target table is no longer present. Fortunately it has a backup.
So when the job is retried, this first step should recover the target (if the reason for the previous fail
is no longer present).
"""

args = job_input.get_arguments()
target_schema = args.get("target_schema")
target_table = args.get("target_table")
trino_queries = TrinoTemplateQueries(job_input)

trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(SELECT * FROM {source_schema}.{source_view} LIMIT 0)
UNION ALL
(SELECT * FROM {target_schema}.{target_table} LIMIT 0)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS {target_schema}.tmp_{target_table}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS {target_schema}.backup_{target_table}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE {target_schema}.tmp_{target_table}(
LIKE {target_schema}.{target_table}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
INSERT INTO {target_schema}.tmp_{target_table}
(
SELECT *
FROM {target_schema}.{target_table}
WHERE {last_arrival_ts} < (SELECT MIN({last_arrival_ts}) FROM {source_schema}.{source_view})
)
UNION ALL
(
SELECT *
FROM {source_schema}.{source_view}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (c) 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from taurus.api.job_input import IJobInput
from taurus.vdk.core import errors
from taurus.vdk.trino_utils import TrinoTemplateQueries

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
"""
In this step we try to move data from tmp_target_table (where we populated the result data in the previous step)
to target table in the following way:
1. Move data from target_table to a backup table
2. Try to move data from tmp_target_table to target_table
3. If 2 fails, try to restore target from backup
4. If 3 succeeds, drop tmp target. The job fails.
5. If 3 fails, target table is lost, its content are in the backup table. Next job retry will try to
recover target on its first step.
6. If 2 succeeds, drop backup, we are ready.

Note: If there is no data in tmp_target_table, we are sure that the source table provided initially was empty,
so we do nothing, target remains unchanged and we drop the empty tmp_target_table.
"""

args = job_input.get_arguments()
target_schema = args.get("target_schema")
source_view = args.get("source_view")
target_table = args.get("target_table")
tmp_target_table = "tmp_" + target_table
trino_queries = TrinoTemplateQueries(job_input)

log.debug("Check if tmp target has data.")
res = job_input.execute_query(
f"""
SELECT COUNT(*) FROM {target_schema}.{tmp_target_table}
"""
)
if res and res[0][0] > 0:
log.debug(
"Confirmed that tmp target has data, proceed with moving it to target."
)
trino_queries.perform_safe_move_data_to_table_step(
from_db=target_schema,
from_table_name=tmp_target_table,
to_db=target_schema,
to_table_name=target_table,
)
else:
log.info(
f"Target table {target_schema}.{target_table} remains unchanged "
f"because source table {target_schema}.{source_view} was empty."
)
trino_queries.drop_table(target_schema, tmp_target_table)
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ def initialize_job(context: JobContext) -> None:
"scd2", pathlib.Path(get_job_path("load/dimension/scd2"))
)

# TODO: revise template name, what this template does currently is not snapshot (maybe rename to append-overwrite)
context.templates.add_template(
"snapshot", pathlib.Path(get_job_path("load/fact/snapshot"))
)


@hookimpl(hookwrapper=True, trylast=True)
def run_step(context: JobContext, step: Step) -> None:
Expand Down
Loading