diff --git a/projects/vdk-plugins/vdk-impala/README.md b/projects/vdk-plugins/vdk-impala/README.md index cf354fc921..de3a4d7e10 100644 --- a/projects/vdk-plugins/vdk-impala/README.md +++ b/projects/vdk-plugins/vdk-impala/README.md @@ -35,6 +35,10 @@ Then, from inside the run function in a Python step, you can use the `send_objec Run vdk config-help - search for those prefixed with "IMPALA_" to see what configuration options are available. +# Disclaimer + +This plugin is tested against a specific impala version. The version comes from the docker-compose.yaml container's impala version. For more information on the imapala version tested against please google the docker image. + # Testing Testing this plugin locally requires installing the dependencies listed in vdk-plugins/vdk-impala/requirements.txt diff --git a/projects/vdk-plugins/vdk-impala/requirements.txt b/projects/vdk-plugins/vdk-impala/requirements.txt index 76421477c0..906d1c4217 100644 --- a/projects/vdk-plugins/vdk-impala/requirements.txt +++ b/projects/vdk-plugins/vdk-impala/requirements.txt @@ -7,3 +7,5 @@ click vdk-test-utils pytest-docker docker-compose +pydantic +pyarrow diff --git a/projects/vdk-plugins/vdk-impala/setup.py b/projects/vdk-plugins/vdk-impala/setup.py index 333514cee0..426722e7dc 100644 --- a/projects/vdk-plugins/vdk-impala/setup.py +++ b/projects/vdk-plugins/vdk-impala/setup.py @@ -14,7 +14,7 @@ description="Versatile Data Kit SDK plugin provides support for Impala database.", long_description=pathlib.Path("README.md").read_text(), long_description_content_type="text/markdown", - install_requires=["vdk-core", "impyla", "tabulate"], + install_requires=["vdk-core", "impyla", "tabulate", "pydantic", "pyarrow"], package_dir={"": "src"}, packages=setuptools.find_namespace_packages(where="src"), include_package_data=True, diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_helper.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_helper.py new file mode 100644 index 0000000000..7aebeb9f6f --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_helper.py @@ -0,0 +1,163 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging +from collections import OrderedDict + +import pyarrow +from vdk.internal.core import errors +from vdk.plugin.impala import impala_error_classifier +from vdk.plugin.impala.impala_connection import ImpalaConnection + + +class ImpalaHelper: + def __init__(self, db_connection: ImpalaConnection) -> None: + self._log = logging.getLogger(__name__) + self._db_connection = db_connection + + def get_table_description(self, table_name): + self._log.debug(f"Retrieving details for table {table_name}.") + try: + return self._db_connection.execute_query(f"DESCRIBE formatted {table_name}") + except Exception as e: + if impala_error_classifier._is_authorization_error(e): + errors.log_and_throw( + to_be_fixed_by=errors.ResolvableBy.USER_ERROR, + log=self._log, + what_happened=f"Data loading into table {table_name} has failed.", + why_it_happened=( + f"You are trying to load data into a table which you do not have access to or it does not " + f"exist: {table_name}." + ), + consequences="Data load will be aborted.", + countermeasures="Make sure that the destination table exists and you have access to it.", + ) + else: + raise e + + def __get_table_schema(self, table_description, section_start="#", second_end="#"): + """ + Gets column names and data types from Impala table. + It would be a lot more easier to execute pure describe statement, but then we will execute 2 describe statements + to get the table schema and check if the table is stored as parquet + Will return the full table schema including partition columns order the same way as in Impala + """ + self._log.debug("Retrieving destination table schema.") + column_name_to_column_type_map = OrderedDict() + is_in_columns_section = False + searched_section_ended = False + searched_sectioned_started = False + for ( + column_name, + column_type, + _, + ) in table_description: # 3rd value is column comment + if column_name is None or column_name.strip() == "": + continue + if column_name.startswith(section_start): # new section begins + searched_sectioned_started = True + if searched_sectioned_started and not is_in_columns_section: + if column_name.strip() == "# col_name": # column info follows + is_in_columns_section = True + else: + is_in_columns_section = False + continue + if searched_sectioned_started: + if column_name.startswith(second_end): + searched_section_ended = True + if is_in_columns_section and not searched_section_ended: + column_name_to_column_type_map[ + column_name.strip() + ] = column_type.strip() + return column_name_to_column_type_map + + def get_table_columns(self, table_description): + """ + :param table_description: result of #get_table_description + :return: dict with column name and type + """ + return self.__get_table_schema(table_description, section_start="#") + + def get_table_partitions(self, table_description): + """ + :param table_description: result of #get_table_description + :return: dict with partition name and type + """ + return self.__get_table_schema( + table_description, section_start="# Partition Information" + ) + + def ensure_table_format_is_parquet(self, table_name, table_description): + for key, value, _ in table_description: # 3rd value is column comment + # Input format of parquet table is "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" + if key is not None and key.strip() == "InputFormat:": + if "parquet" in value: # table is stored as parquet + return + else: # table is not stored as parquet + errors.log_and_throw( + to_be_fixed_by=errors.ResolvableBy.USER_ERROR, + log=self._log, + what_happened="Data loading has failed.", # FIXME: this is too specific + why_it_happened=( + f"You are trying to load data into a table {table_name} with an unsupported format. " + f"Currently only Parquet table format is supported." + ), + consequences="Data load will be aborted.", # FIXME: this is too specific + countermeasures=( + "Make sure that the destination table is stored as parquet: " + "https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html" + "#parquet_ddl" + ), + ) + # TODO once there is more robust loading implemented the below error can be removed. We can try to load even if + # we cannot determine the table storage type + errors.log_and_throw( + to_be_fixed_by=errors.ResolvableBy.PLATFORM_ERROR, + log=self._log, + what_happened="Cannot determine the target table file format, which is needed to load data into it.", + why_it_happened="There's a bug in VDK code.", + consequences="Application will exit.", + countermeasures="Report this bug to versatile data kit team.", + ) + + def generate_parquet_schema_from_table_schema(self, table_columns): + """ + Builds the parquet schema based on the column types and order in the target table, in order to ensure the new file + will be compatible with the table + """ + self._log.debug("Generating parquet file schema from table schema.") + impala_type_to_pyarrow_type_map = { + "string": pyarrow.string(), + "boolean": pyarrow.bool_(), + "double": pyarrow.float64(), + "float": pyarrow.float32(), + "int": pyarrow.int32(), + "bigint": pyarrow.int64(), + "timestamp": pyarrow.timestamp("ns"), + } + # including the decimal types in the map + for precision_value in range(1, 39): + for scale_value in range(0, precision_value + 1): + impala_type_to_pyarrow_type_map[ + f"decimal({precision_value},{scale_value})" + ] = pyarrow.decimal128(precision_value, scale_value) + + parquet_schema = [] + for column_name, column_type in table_columns.items(): + parquet_schema.append( + (column_name, impala_type_to_pyarrow_type_map[column_type]) + ) + return pyarrow.schema(parquet_schema) + + def get_parquet_schema(self, table): + table_description = self.get_table_description(table) + self.ensure_table_format_is_parquet(table, table_description) + table_columns = self.get_table_columns(table_description) + return self.generate_parquet_schema_from_table_schema(table_columns) + + @staticmethod + def get_insert_sql_partition_clause(partitions): + # https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/impala_insert.html + + # NOTE: https://github.com/kayak/pypika looks interesting if we start having more complex query buildings + sql = "PARTITION (" + ",".join("`" + p + "`" for p in partitions.keys()) + ")" + return sql diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_plugin.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_plugin.py index b692ab568e..441a08a782 100644 --- a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_plugin.py +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/impala_plugin.py @@ -1,6 +1,8 @@ # Copyright 2021 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 import logging +import os +import pathlib from typing import List import click @@ -71,6 +73,38 @@ def initialize_job(self, context: JobContext) -> None: lambda: _connection_by_configuration(self._impala_cfg), ) + context.templates.add_template( + "load/dimension/scd1", pathlib.Path(get_job_path("load/dimension/scd1")) + ) + + context.templates.add_template( + "scd1", pathlib.Path(get_job_path("load/dimension/scd1")) + ) + + context.templates.add_template( + "load/dimension/scd2", pathlib.Path(get_job_path("load/dimension/scd2")) + ) + + context.templates.add_template( + "scd2", pathlib.Path(get_job_path("load/dimension/scd2")) + ) + + context.templates.add_template( + "load/fact/snapshot", pathlib.Path(get_job_path("load/fact/snapshot")) + ) + + context.templates.add_template( + "snapshot", pathlib.Path(get_job_path("load/fact/snapshot")) + ) + + context.templates.add_template( + "load/versioned", pathlib.Path(get_job_path("load/versioned")) + ) + + context.templates.add_template( + "versioned", pathlib.Path(get_job_path("load/versioned")) + ) + @staticmethod @hookimpl(hookwrapper=True, tryfirst=True) def run_step(context: JobContext, step: Step) -> None: @@ -116,3 +150,14 @@ def db_connection_decorate_operation(self, decoration_cursor: DecorationCursor): @hookimpl def vdk_start(plugin_registry: IPluginRegistry, command_line_args: List): plugin_registry.load_plugin_with_hooks_impl(ImpalaPlugin(), "impala-plugin") + + +def get_jobs_parent_directory() -> pathlib.Path: + current_dir = pathlib.Path(os.path.dirname(os.path.abspath(__file__))) + jobs_dir = current_dir.joinpath("templates") + return jobs_dir + + +def get_job_path(job_name: str) -> str: + """Get the path of the test data job returned as string so it can be passed easier as cmd line args""" + return str(get_jobs_parent_directory().joinpath(job_name)) diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/README.md b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/README.md new file mode 100644 index 0000000000..b69adc88c8 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/README.md @@ -0,0 +1,3 @@ +### Types of data loading templates +- Slowly Changing Dimension Type 1 - TBD +- Snapshot Accumulating Fact Table - TBD diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/__init__.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/__init__.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/__init__.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/00-dimension-scd1-definition.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/00-dimension-scd1-definition.py new file mode 100644 index 0000000000..115a16b2c7 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/00-dimension-scd1-definition.py @@ -0,0 +1,27 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from pydantic import BaseModel +from vdk.api.job_input import IJobInput +from vdk.plugin.impala.templates.template_arguments_validator import ( + TemplateArgumentsValidator, +) + + +class SlowlyChangingDimensionTypeOverwriteParams(BaseModel): + target_schema: str + target_table: str + source_schema: str + source_view: str + + +class SlowlyChangingDimensionTypeOverwrite(TemplateArgumentsValidator): + TemplateParams = SlowlyChangingDimensionTypeOverwriteParams + + def __init__(self) -> None: + super().__init__() + + +def run(job_input: IJobInput): + SlowlyChangingDimensionTypeOverwrite().get_validated_args( + job_input, job_input.get_arguments() + ) diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/01-test-if-view-matches-target.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/01-test-if-view-matches-target.sql new file mode 100644 index 0000000000..a0cb6c08ba --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/01-test-if-view-matches-target.sql @@ -0,0 +1,3 @@ +SELECT * FROM {source_schema}.{source_view} LIMIT 0 +UNION ALL +SELECT * FROM {target_schema}.{target_table} LIMIT 0; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/02-insert-into-target.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/02-insert-into-target.sql new file mode 100644 index 0000000000..bf328c471c --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/02-insert-into-target.sql @@ -0,0 +1,9 @@ +/* TO DO DROP AND RECREATE TARGET TABLE ON FULL RELOAD OR DATA TYPE CHANGE */ + +-- DROP TABLE {target_schema}.{target_table}; +-- CREATE TABLE {target_schema}.{target_table} STORED AS PARQUET AS SELECT * FROM {target_schema}.{target_table}; + +-- /* +SHUFFLE */ below is a query hint to Impala. Do not remove! +-- See https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_hints.html for details. +INSERT OVERWRITE TABLE {target_schema}.{target_table} {_vdk_template_insert_partition_clause} /* +SHUFFLE */ +SELECT * FROM {source_schema}.{source_view}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/03-refresh.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/03-refresh.sql new file mode 100644 index 0000000000..ee3e889787 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/03-refresh.sql @@ -0,0 +1,2 @@ +-- make sure metadata about the new blocks in the {target_table} is propagated to the other impalad deamons +REFRESH {target_schema}.{target_table}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/04-compute-stats.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/04-compute-stats.sql new file mode 100644 index 0000000000..00d3e0ef15 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/04-compute-stats.sql @@ -0,0 +1 @@ +COMPUTE STATS {target_schema}.{target_table}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/README.md b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/README.md new file mode 100644 index 0000000000..39d5319308 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/README.md @@ -0,0 +1,43 @@ +### Purpose: + +This template can be used to load raw data from the Data Lake to target 'Slowly Changing Dimension Type 1' table in the Data Warehouse. +In summary, it overwrites the target table with the source data. + +### Details: + + +### Template Name (template_name): + +- "load/dimension/scd1" + +### Template Parameters (template_args): + +- target_schema - SC Data Warehouse schema, where target data is loaded +- target_table - SC Data Warehouse table of DW type 'Slowly Changing Dimension Type 1', where target data is loaded +- source_schema - SC Data Lake schema, where source raw data is loaded from +- source_view - SC Data Lake view, where source raw data is loaded from + +### Prerequisites: + +In order to use this template you need to ensure the following: +- {source_schema}.{source_view} exists +- {target_schema}.{target_table} exists +- {source_schema}.{source_view} has the exact same schema as {target_schema}.{target_table} + +### Sample Usage: + +Say there is SDDC-related 'Slowly Changing Dimension Type 1' target table called 'dim_sddc' in 'history' schema. +Updating it with the latest raw data from the Data Lake (from source view called 'vw_dim_sddc' in 'default' schema) is done in the following manner: + +```python +def run(job_input): + # . . . + template_args = { + 'source_schema': 'default', + 'source_view': 'vw_dim_sddc', + 'target_schema': 'history', + 'target_table': 'dim_sddc', + } + job_input.execute_template("load/dimension/scd1", template_args) + # . . . +``` diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/__init__.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/00-dimension-scd2-definition.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/00-dimension-scd2-definition.py new file mode 100644 index 0000000000..3885cc9a37 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/00-dimension-scd2-definition.py @@ -0,0 +1,32 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from pydantic import BaseModel +from vdk.api.job_input import IJobInput +from vdk.plugin.impala.templates.template_arguments_validator import ( + TemplateArgumentsValidator, +) + + +class SlowlyChangingDimensionType2Params(BaseModel): + target_schema: str + target_table: str + source_schema: str + source_view: str + start_time_column: str + end_time_column: str + end_time_default_value: str + surrogate_key_column: str + id_column: str + + +class SlowlyChangingDimensionType2(TemplateArgumentsValidator): + TemplateParams = SlowlyChangingDimensionType2Params + + def __init__(self) -> None: + super().__init__() + + +def run(job_input: IJobInput): + SlowlyChangingDimensionType2().get_validated_args( + job_input, job_input.get_arguments() + ) diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/01-test-if-view-matches-target.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/01-test-if-view-matches-target.sql new file mode 100644 index 0000000000..7a565278b0 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/01-test-if-view-matches-target.sql @@ -0,0 +1,3 @@ +SELECT uuid(), * FROM {source_schema}.{source_view} LIMIT 0 +UNION ALL +SELECT * FROM {target_schema}.{target_table} LIMIT 0; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/02-insert-into-target.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/02-insert-into-target.sql new file mode 100644 index 0000000000..024abc5c28 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/02-insert-into-target.sql @@ -0,0 +1,40 @@ +/* TO DO DROP AND RECREATE TARGET TABLE ON FULL RELOAD OR DATA TYPE CHANGE */ + +-- DROP TABLE {target_schema}.{target_table}; +-- CREATE TABLE {target_schema}.{target_table} STORED AS PARQUET AS SELECT * FROM {target_schema}.stg_{target_table}; + +-- /* +SHUFFLE */ below is a query hint to Impala. +-- Do not remove! https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_hints.html +INSERT OVERWRITE TABLE {target_schema}.{target_table} {_vdk_template_insert_partition_clause} /* +SHUFFLE */ +WITH + -- filter from the target all elements that define non-current state and are not updated/present in the source + tgt_filtered AS ( + SELECT * + FROM {target_schema}.{target_table} + WHERE {end_time_column} != '{end_time_default_value}' + AND CONCAT(CAST({id_column} AS STRING), CAST({start_time_column} AS STRING)) NOT IN ( + SELECT CONCAT(CAST({id_column} AS STRING), CAST({start_time_column} AS STRING)) + FROM {source_schema}.{source_view} + ) + ), + -- filter from the source all elements which are present in tgt_filtered + src_filtered AS ( + SELECT * + FROM {source_schema}.{source_view} + WHERE CONCAT(CAST({id_column} AS STRING), CAST({start_time_column} AS STRING)) NOT IN ( + SELECT CONCAT(CAST({id_column} AS STRING), CAST({start_time_column} AS STRING)) + FROM tgt_filtered + ) + ) +( + SELECT * + FROM tgt_filtered +) +UNION ALL +( + SELECT COALESCE(tgt.{surrogate_key_column}, uuid()), src.* + FROM src_filtered AS src + LEFT JOIN {target_schema}.{target_table} AS tgt + ON src.{id_column} = tgt.{id_column} + AND src.{start_time_column} = tgt.{start_time_column} +) diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/03-refresh.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/03-refresh.sql new file mode 100644 index 0000000000..ee3e889787 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/03-refresh.sql @@ -0,0 +1,2 @@ +-- make sure metadata about the new blocks in the {target_table} is propagated to the other impalad deamons +REFRESH {target_schema}.{target_table}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/04-compute-stats.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/04-compute-stats.sql new file mode 100644 index 0000000000..00d3e0ef15 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/04-compute-stats.sql @@ -0,0 +1 @@ +COMPUTE STATS {target_schema}.{target_table}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/README.md b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/README.md new file mode 100644 index 0000000000..318a3a0680 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/README.md @@ -0,0 +1,55 @@ +### Purpose: + +Template used to load raw data from a Data Lake to target 'Slowly Changing Dimension Type 2' table in a Data Warehouse. + +### Details: + +Explanation of SCD type 2 can be seen here: + +### Template Name (template_name): + +- "load/dimension/scd2" + +### Template Parameters (template_args): + +- target_schema - SC Data Warehouse schema, where target data is loaded +- target_table - SC Data Warehouse table of DW type 'Slowly Changing Dimension Type 2', where target data is loaded +- source_schema - SC Data Lake schema, where source raw data is loaded from +- source_view - SC Data Lake view, where source raw data is loaded from +- start_time_column - Column that holds the start time of the period for which a record is valid +- end_time_column - Column that holds the end time of the period for which a record is valid +- end_time_default_value - Default value for end time column used to indicate whether this is the current state of the record, e.g. '2999-01-01T00:00:00Z' +- surrogate_key_column - Column that holds unique id permanently bound to the time period defined by that row of the slowly changing dimension table. Useful for efficient joins with other fact tables. +- id_column - Column that holds the natural key of the target table. + +### Prerequisites: + +In order to use this template you need to ensure the following: +- {source_schema}.{source_view} exists +- {target_schema}.{target_table} exists +- The schema of {target_schema}.{target_table} must begin with a string column (used to hold the surrogate key) followed by all columns of {source_schema}.{source_view}. +- {source_schema}.{source_view} must contain all columns specified in the Parameters section. +- In {source_schema}.{source_view}, for records which represent current state their end_time value must be the same as the value provided as end_time_default_value + +### Sample Usage: + +Say there is SDDC-related 'Slowly Changing Dimension Type 2' target table called 'dim_sddc_h' in 'history' schema. +Updating end date of existing current records representing current state and adding new state records (from source view called 'vw_dim_sddc_h' in 'default' schema) is done in the following manner: + +```python +def run(job_input): + # . . . + template_args = { + 'source_schema': 'default', + 'source_view': 'vw_dim_sddc_h', + 'target_schema': 'history', + 'target_table': 'dim_sddc_h', + 'start_time_column': '', + 'end_time_default_value': '2999-01-01T00:00:00Z', + 'end_time_column': '', + 'surrogate_key_column': '', + 'id_column': '' + } + job_input.execute_template('load/dimension/scd2', template_args) + # . . . +``` diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/__init__.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd2/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/__init__.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/00-fact-snapshot-definition.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/00-fact-snapshot-definition.py new file mode 100644 index 0000000000..501cb19cf7 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/00-fact-snapshot-definition.py @@ -0,0 +1,26 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from pydantic import BaseModel +from vdk.api.job_input import IJobInput +from vdk.plugin.impala.templates.template_arguments_validator import ( + TemplateArgumentsValidator, +) + + +class FactDailySnapshotParams(BaseModel): + target_schema: str + target_table: str + source_schema: str + source_view: str + last_arrival_ts: str + + +class FactDailySnapshot(TemplateArgumentsValidator): + TemplateParams = FactDailySnapshotParams + + def __init__(self) -> None: + super().__init__() + + +def run(job_input: IJobInput): + FactDailySnapshot().get_validated_args(job_input, job_input.get_arguments()) diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/01-test-if-view-matches-target.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/01-test-if-view-matches-target.sql new file mode 100644 index 0000000000..a0cb6c08ba --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/01-test-if-view-matches-target.sql @@ -0,0 +1,3 @@ +SELECT * FROM {source_schema}.{source_view} LIMIT 0 +UNION ALL +SELECT * FROM {target_schema}.{target_table} LIMIT 0; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/02-insert-into-target.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/02-insert-into-target.sql new file mode 100644 index 0000000000..af01c94a4d --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/02-insert-into-target.sql @@ -0,0 +1,13 @@ +-- /* +SHUFFLE */ below is a query hint to Impala. +-- Do not remove! https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_hints.html +INSERT OVERWRITE TABLE {target_schema}.{target_table} {_vdk_template_insert_partition_clause} /* +SHUFFLE */ +( + SELECT * + FROM {target_schema}.{target_table} + WHERE {last_arrival_ts} < (SELECT MIN({last_arrival_ts}) FROM {source_schema}.{source_view}) +) +UNION ALL +( + SELECT * + FROM {source_schema}.{source_view} +); diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/03-refresh.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/03-refresh.sql new file mode 100644 index 0000000000..a865459dc4 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/03-refresh.sql @@ -0,0 +1 @@ +REFRESH {target_schema}.{target_table}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/04-compute-stats.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/04-compute-stats.sql new file mode 100644 index 0000000000..00d3e0ef15 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/04-compute-stats.sql @@ -0,0 +1 @@ +COMPUTE STATS {target_schema}.{target_table}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/README.md b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/README.md new file mode 100644 index 0000000000..d521f6a820 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/README.md @@ -0,0 +1,48 @@ +### Purpose: + +This template can be used to load raw data from Data Lake to target 'Snapshot Periodic Fact Table' in Data Warehouse. +In summary, it appends a snapshot of records observed between time t1 and t2 from the source table to the target table, +truncating all present target table records observed after t1. + +### Details: + + + +### Template Name (template_name): + +- "load/fact/snapshot" + +### Template Parameters (template_args): + +- target_schema - SC Data Warehouse schema, where target data is loaded +- target_table - SC Data Warehouse table of DW type 'Snapshot Periodic Fact Table', where target data is loaded +- source_schema - SC Data Lake schema, where source raw data is loaded from +- source_view - SC Data Lake view, where source raw data is loaded from +- last_arrival_ts - Timestamp column, on which increments to target_table are done + +### Prerequisites: + +In order to use this template you need to ensure the following: +- {source_schema}.{source_view} exists +- {target_schema}.{target_table} exists +- {source_schema}.{source_view} has the exact same schema as {target_schema}.{target_table} +- {last_arrival_ts} is timestamp column suitable for 'Snapshot Periodic Fact Table' increments + +### Sample Usage: + +Say there is SDDC-related 'Snapshot Periodic Fact Table' called 'fact_sddc_daily' in 'history' schema. +Updating it with the latest raw data from a Data Lake (from source view called 'vw_fact_sddc_daily' in 'default' schema) is done in the following manner: + +```python +def run(job_input): + # . . . + template_args = { + 'source_schema': 'default', + 'source_view': 'vw_fact_sddc_daily', + 'target_schema': 'history', + 'target_table': 'fact_sddc_daily', + 'last_arrival_ts': 'updated_at', + } + job_input.execute_template('load/fact/snapshot', template_args) + # . . . +``` diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/__init__.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/params.json b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/params.json new file mode 100644 index 0000000000..59c534dde7 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/params.json @@ -0,0 +1,7 @@ +[ + "target_schema", + "target_table", + "source_schema", + "source_view", + "last_arrival_ts" +] diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/00-versioned-definition.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/00-versioned-definition.py new file mode 100644 index 0000000000..44b7812dc1 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/00-versioned-definition.py @@ -0,0 +1,64 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from typing import List + +from pydantic import BaseModel +from pydantic import validator +from vdk.api.job_input import IJobInput +from vdk.plugin.impala.templates.template_arguments_validator import ( + TemplateArgumentsValidator, +) + + +class LoadVersionedParams(BaseModel): + target_schema: str + target_table: str + source_schema: str + source_view: str + id_column: str + value_columns: List[str] + tracked_columns: List[str] + updated_at_column: str = "updated_at" + sk_column: str = "sk" + active_from_column: str = "active_from" + active_to_column: str = "active_to" + active_to_max_value: str = "9999-12-31" + + @validator("tracked_columns", allow_reuse=True) + def passwords_match(cls, tracked_columns, values, **kwargs): + value_columns = values.get("value_columns") + if type(value_columns) == list and not tracked_columns: + raise ValueError("The list must contain at least one column") + if type(value_columns) == list == type(value_columns) and not set( + tracked_columns + ) <= set(value_columns): + raise ValueError( + "All elements in the list must be also present in `value_columns`" + ) + return tracked_columns + + +class LoadVersioned(TemplateArgumentsValidator): + TemplateParams = LoadVersionedParams + + def __init__(self) -> None: + super().__init__() + + def _validate_args(self, args: dict) -> dict: + args = super()._validate_args(args) + return dict( + **args, + value_columns_str=", ".join( + [f"`{column}`" for column in args["value_columns"]] + ), + hash_expr_str=",\n".join( + [ + f" COALESCE(CAST(`{column}` AS STRING), '#')" + for column in args["tracked_columns"] + ] + ).lstrip(), + ) + + +def run(job_input: IJobInput): + LoadVersioned().get_validated_args(job_input, job_input.get_arguments()) diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/01-test-if-view-matches-target.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/01-test-if-view-matches-target.sql new file mode 100644 index 0000000000..8ad6fc66d7 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/01-test-if-view-matches-target.sql @@ -0,0 +1,19 @@ +( + SELECT + NULL as `{sk_column}`, + NULL as `{active_from_column}`, + NULL as `{active_to_column}`, + `{id_column}`, + {value_columns_str} + FROM + `{source_schema}`.`{source_view}` + LIMIT 0 +) +UNION ALL +( + SELECT + * + FROM + `{target_schema}`.`{target_table}` + LIMIT 0 +); diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/02-insert-into-target.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/02-insert-into-target.sql new file mode 100644 index 0000000000..80324a7403 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/02-insert-into-target.sql @@ -0,0 +1,117 @@ +/* TO DO DROP AND RECREATE TARGET TABLE ON FULL RELOAD OR DATA TYPE CHANGE */ + +-- DROP TABLE {target_schema}.{target_table}; +-- CREATE TABLE {target_schema}.{target_table} STORED AS PARQUET AS SELECT * FROM {target_schema}.stg_{target_table}; + +-- /* +SHUFFLE */ below is a query hint to Impala. +-- Do not remove! https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_hints.html +INSERT OVERWRITE TABLE {target_schema}.{target_table} {_vdk_template_insert_partition_clause} /* +SHUFFLE */ +WITH + -- Compute the union of the source and target tables and augment each side with the following columns: + -- `_values_hash`: + -- A hash of the tracked values. + -- `_lineage`: + -- Tag the record source (either "source" or "target"). + `{target_table}_union` AS ( + ( + SELECT + 'source' AS `_lineage`, + -- FIXME: change from '#' to NULL results in collision + FNV_HASH( + CONCAT_WS('|', + {hash_expr_str} + ) + ) AS `_values_hash`, + UUID() as `{sk_column}`, + `{updated_at_column}` AS `{active_from_column}`, + CAST("9999-12-31" AS TIMESTAMP) AS `{active_to_column}`, + `{id_column}`, + {value_columns_str} + FROM + `{source_schema}`.`{source_view}` + ) + UNION ALL + ( + SELECT + 'target' AS `_lineage`, + -- FIXME: change from '#' to NULL results in collision + FNV_HASH( + CONCAT_WS('|', + {hash_expr_str} + ) + ) AS `_values_hash`, + `{sk_column}`, + `{active_from_column}`, + `{active_to_column}`, + `{id_column}`, + {value_columns_str} + FROM + `{target_schema}`.`{target_table}` + ) + ), + -- Extend `{target_table}_union` with the following expressions: + -- `_is_overridden`: + -- True if and only if the record is overridden by a following record in a partition of records that share the + -- same primary key and start time, where "target" records are listed before "source". + -- `{sk_column}`, `{active_to_column}`: + -- The first value of that column from the partition of records sharing the same primary key and start time, + -- where "target" records are listed before "source". + -- The result set is ready for elimination of overridden records. + `{target_table}_union_extended_v1` AS ( + SELECT + `_lineage`, + `_values_hash`, + LEAD(TRUE, 1, FALSE) OVER( + PARTITION BY `{id_column}`, `{active_from_column}` + ORDER BY `_lineage` DESC + ) AS `_is_overridden`, + FIRST_VALUE(`{sk_column}`) OVER( + PARTITION BY `{id_column}`, `{active_from_column}` + ORDER BY `_lineage` DESC + ) AS `{sk_column}`, + `{active_from_column}`, + FIRST_VALUE(`{active_to_column}`) OVER( + PARTITION BY `{id_column}`, `{active_from_column}` + ORDER BY `_lineage` DESC + ) AS `{active_to_column}`, + `{id_column}`, + {value_columns_str} + FROM + `{target_table}_union` + ), + -- Exclude overridden records from `{target_table}_union_extended_v1` and + -- extend the result with the following expressions: + -- `_merge_with_previous`: + -- A boolean flag indicating whether the record will be merged with the previous record within a partition of + -- records that share the same primary key and values hash and are ordered by their "active from" timestamp. + -- The result set is ready for merging of adjacent records with the same values hash. + `{target_table}_union_extended_v2` AS ( + SELECT + LAG(`_values_hash`, 1, 0) OVER( + PARTITION BY `{id_column}` + ORDER BY `{active_from_column}` + ) = `_values_hash` AS `_merge_with_previous`, + `{sk_column}`, + `{active_from_column}`, + `{active_to_column}`, + `{id_column}`, + {value_columns_str} + FROM + `{target_table}_union_extended_v1` + WHERE + `_is_overridden` = FALSE + ) + -- Exclude records that are merged with the preceding record and fix the "active to" timestamp. + SELECT + `{sk_column}`, + `{active_from_column}`, + LEAD(`{active_from_column}`, 1, '9999-12-31') OVER( + PARTITION BY `{id_column}` + ORDER BY `{active_from_column}` + ) AS `{active_to_column}`, + `{id_column}`, + {value_columns_str} + FROM + `{target_table}_union_extended_v2` + WHERE + `_merge_with_previous` = FALSE; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/03-refresh.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/03-refresh.sql new file mode 100644 index 0000000000..ee3e889787 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/03-refresh.sql @@ -0,0 +1,2 @@ +-- make sure metadata about the new blocks in the {target_table} is propagated to the other impalad deamons +REFRESH {target_schema}.{target_table}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/04-compute-stats.sql b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/04-compute-stats.sql new file mode 100644 index 0000000000..00d3e0ef15 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/04-compute-stats.sql @@ -0,0 +1 @@ +COMPUTE STATS {target_schema}.{target_table}; diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/README.md b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/README.md new file mode 100644 index 0000000000..e2b36f7054 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/README.md @@ -0,0 +1,56 @@ +### Purpose: + +Template used to load raw data from a Data Lake to target 'Slowly Changing Dimension Type 2' table in a Data Warehouse. +In summary, it accumulates updates from the data source as versioned records in the target table. + +### Details: + +Explanation of SCD type 2 can be seen here: + +### Template Name (template_name): + +- "load/versioned" + +### Template Parameters (template_args): + +- target_schema - Target schema where the versioned data is stored. Typically, a Data Warehouse (DW) schema. +- target_table - Target table where the versioned data is loaded. Typically, a Slowly Changing Dimension (SCD) of Type 2. +- source_schema - SC Data Lake schema containing the source view. +- source_view - SC Data Lake view where source data is loaded from. +- id_column - Column that holds the natural key of the target table. +- value_columns - A list of columns from the source that are considered errors. Present both in the source and the target tables. +- tracked_columns - A sublist of the value columns that are tracked for changes. Present both in the source and the target tables. +- updated_at_column - A column containing the update time of a record. Present in the source table. Optional (default value is "updated_at"). +- sk_column - A surrogate key column that is automatically generated in the target table. Optional (default value is "sk"). +- active_from_column - A column denoting the start time of a record in the target table. Optional (default value is "active_from"). +- active_to_column - A column denoting the end time of a record in the target table. Equals `active_to_max_value` if the record is not closed. Optional (default value is "active_to"). +- active_to_max_value - A value denoting an open record in the target table. Optional (default value is "9999-12-31"). + +### Prerequisites: + +In order to use this template you need to ensure the following: + +- `{source_schema}`.`{source_view}` exists and consists of the `id_column`, the `value_columns`, and the `updated_at_column`. +- `{target_schema}`.`{target_table}` exists and consists of the following columns (in this order): `{sk_column}`, `{active_from_column}`, `{active_to_column}`, `{id_column}`, and `{value_columns}`. + +### Sample Usage: + +Say there is SDDC-related 'Slowly Changing Dimension Type 2' target table called 'dim_sddc_h' in 'history' schema. + +Integrating a date of existing current records representing current state and adding new state records (from source view called 'vw_dim_sddc_h' in 'default' schema) is done in the following manner: + +```python +def run(job_input): + # . . . + template_args = { + 'source_schema': 'default', + 'source_view': 'vw_dim_sddc_h', + 'target_schema': 'history', + 'target_table': 'dim_sddc_h', + 'id_column': 'sddc_id', + 'value_columns': ['hosts', 'state', 'is_nsxt', 'cloud_vendor', 'version'], + 'tracked_columns': ['hosts', 'state', 'is_nsxt', 'cloud_vendor', 'version'], + } + job_input.execute_template('load/versioned', template_args) + # . . . +``` diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/__init__.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/versioned/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/template_arguments_validator.py b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/template_arguments_validator.py new file mode 100644 index 0000000000..38e96c4340 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/template_arguments_validator.py @@ -0,0 +1,67 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from logging import getLogger +from typing import cast +from typing import Type + +from pydantic import BaseModel +from pydantic import ValidationError +from vdk.api.job_input import IJobInput +from vdk.internal.builtin_plugins.run.job_input import JobInput +from vdk.internal.core import errors +from vdk.plugin.impala.impala_helper import ImpalaHelper + +log = getLogger(__name__) + + +class TemplateArgumentsValidator: + TemplateParams: Type[BaseModel] + + def __init__(self) -> None: + pass + + def get_validated_args(self, job_input: IJobInput, args: dict) -> dict: + args.update(self._validate_args(args)) + args["_vdk_template_insert_partition_clause"] = "" + + impala_helper = ImpalaHelper(cast(JobInput, job_input).get_managed_connection()) + table_name = "`{target_schema}`.`{target_table}`".format(**args) + table_description = impala_helper.get_table_description(table_name) + partitions = impala_helper.get_table_partitions(table_description) + if partitions: + args[ + "_vdk_template_insert_partition_clause" + ] = impala_helper.get_insert_sql_partition_clause(partitions) + + impala_helper.ensure_table_format_is_parquet(table_name, table_description) + + source_view_full_name = "`{source_schema}`.`{source_view}`".format(**args) + raw_source_view_has_results = job_input.execute_query( + """ + WITH limited_view AS (SELECT * FROM {} LIMIT 1) + SELECT COUNT(1) > 0 FROM limited_view + """.format( + source_view_full_name + ) + ) + source_view_has_results = raw_source_view_has_results[0][0] + if not source_view_has_results: + log.info("Source view returns no results. Will NOT execute template!") + raise Exception( + "Source view returns no results. Will NOT execute template!" + ) + return args + + def _validate_args(self, args: dict) -> dict: + try: + return self.TemplateParams(**args).dict() + except ValidationError as error: + errors.log_and_rethrow( + to_be_fixed_by=errors.ResolvableBy.USER_ERROR, + log=log, + what_happened="Template execution in Data Job finished with error", + why_it_happened=errors.MSG_WHY_FROM_EXCEPTION(error), + consequences=errors.MSG_CONSEQUENCE_TERMINATING_APP, + countermeasures=errors.MSG_COUNTERMEASURE_FIX_PARENT_EXCEPTION, + exception=error, + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/01_prepare_input_data.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/01_prepare_input_data.py new file mode 100644 index 0000000000..849163ff1c --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/01_prepare_input_data.py @@ -0,0 +1,81 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Load example input data for an scd1 template test. +""" +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{target_schema}`.`{target_table}` + # ''') + + # job_input.execute_query(u""" + # CREATE DATABASE IF NOT EXISTS `{target_schema}` + # """) + + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{target_schema}`.`{target_table}` ( + `org_id` INT, + `org_name` STRING, + `org_type` STRING, + `company_name` STRING, + `sddc_limit` INT, + `org_host_limit` INT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{target_schema}`.`{target_table}` VALUES ( + (2, "johnlocke@vmware.com" , "CUSTOMER_POC" , "VMware" , 1, 6 ), + (3, "lilly.johnsonn@goofys.com", "CUSTOMER" , "Goofy's" , 2, 16), + (4, "jilliandoe@uncanny.ca" , "PARTNER_SISO" , "Uncanny Company" , 2, 16), + (5, "jane.doe@vmware.com" , "CUSTOMER" , "VMware" , 2, 32), + (6, "john.doe@pharmamed.com" , "CUSTOMER" , "PharmaMed" , 1, 32), + (7, "andrej.maya@acme.com" , "PARTNER_SISO" , "ACME" , 1, 32), + (8, "guang@vmware.com" , "INTERNAL_CORE" , "VMware" , 4, 32) + ) + """ + ) + + # Step 2: create a table that represents the next state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{source_schema}`.`{source_view}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{source_schema}`.`{source_view}` ( + `org_id` INT, + `org_name` STRING, + `org_type` STRING, + `company_name` STRING, + `sddc_limit` INT, + `org_host_limit` INT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{source_schema}`.`{source_view}` VALUES ( + (1, "mullen@actual.com" , "CUSTOMER_MSP_TENANT", "actual Master Org", 2, 32), + (2, "johnlocke@vmware.com" , "CUSTOMER_POC" , "VMware" , 1, 6 ), + (3, "lilly.johnsonn@goofys.com", "CUSTOMER" , "Goofy's" , 2, 32), + (4, "jilliandoe@uncanny.ca" , "PARTNER_SISO" , "Uncanny Company" , 2, 32), + (5, "jane.doe@vmware.com" , "CUSTOMER" , "VMware" , 2, 32), + (6, "john.doe@pharmamed.com" , "CUSTOMER" , "PharmaMed" , 2, 32), + (7, "andrej.maya@acme.com" , "PARTNER_SISO" , "ACME" , 2, 32), + (8, "guang@vmware.com" , "INTERNAL_CORE" , "VMware" , 2, 32) + ) + """ + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/02_run_load_dimension_scd1_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/02_run_load_dimension_scd1_template.py new file mode 100644 index 0000000000..dd439171f1 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/02_run_load_dimension_scd1_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/dimension/scd1", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_job/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_only/01_run_load_dimension_scd1_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_only/01_run_load_dimension_scd1_template.py new file mode 100644 index 0000000000..dd439171f1 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_only/01_run_load_dimension_scd1_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/dimension/scd1", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_only/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_only/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_only/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/01_prepare_input_data.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/01_prepare_input_data.py new file mode 100644 index 0000000000..b47cbc3ab0 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/01_prepare_input_data.py @@ -0,0 +1,67 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Load example input data for an scd1 template test. +""" +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{target_schema}`.`{target_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{target_schema}`.`{target_table}` ( + `org_id` INT, + `org_name` STRING, + `sddc_limit` INT, + `org_host_limit` INT + ) + PARTITIONED BY (`org_type` STRING, `company_name` STRING) + STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + TRUNCATE `{target_schema}`.`{target_table}` + """ + ) + # Step 2: create a table that represents the next state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{source_schema}`.`{source_view}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{source_schema}`.`{source_view}` ( + `org_id` INT, + `org_name` STRING, + `sddc_limit` INT, + `org_host_limit` INT, + `org_type` STRING, + `company_name` STRING + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{source_schema}`.`{source_view}` VALUES ( + (1, "mullen@actual.com" , 2, 32, "CUSTOMER_MSP_TENANT", "actual Master Org"), + (2, "johnlocke@vmware.com" , 1, 6 , "CUSTOMER_POC" , "VMware" ), + (3, "lilly.johnsonn@goofys.com", 2, 32, "CUSTOMER" , "Goofy" ), + (4, "jilliandoe@uncanny.ca" , 2, 32, "PARTNER_SISO" , "Uncanny Company" ), + (5, "jane.doe@vmware.com" , 2, 32, "CUSTOMER" , "VMware" ), + (6, "john.doe@pharmamed.com" , 2, 32, "CUSTOMER" , "PharmaMed" ), + (7, "andrej.maya@acme.com" , 2, 32, "PARTNER_SISO" , "ACME" ), + (8, "guang@vmware.com" , 2, 32, "INTERNAL_CORE" , "VMware" ) + ) + """ + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/02_run_load_dimension_scd1_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/02_run_load_dimension_scd1_template.py new file mode 100644 index 0000000000..dd439171f1 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/02_run_load_dimension_scd1_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/dimension/scd1", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd1_template_partition_job/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/01_prepare_input_data.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/01_prepare_input_data.py new file mode 100644 index 0000000000..deab47d4d4 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/01_prepare_input_data.py @@ -0,0 +1,193 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Load example input data for an scd2 template test. + +The data is constructed working backwards from the current scd2 template definition as follows. + +We have a source relation `S` (usually a view) and a target relation `T`. The elements in these two relations are +uniquely identified by the (`{id_column}`, `{start_time_column}`) composite key. An element is said to be "current in (a +relation) R" if and only if its `{end_time_column}` equals the user-defined `{end_time_default_value}` (which usually is +the largest possible timestamp). + + +We partition the items in `T` according to the following predicates: + +- `C`: elements that are current in `T`, +- `P`: elements that are present in `S`, +- `M`: elements that are modified in `S`. + +Obviously, it does not make sense to distinguish between modified or unmodified elements if these elements are not +present in `S`. In other words, `M` does not further partition equivalence classes that contain `¬P`. + +``` + C ∧ ¬P ∧ M = C ∧ ¬P ∧ ¬M = C ∧ ¬P +¬C ∧ ¬P ∧ M = ¬C ∧ ¬P ∧ ¬M = ¬C ∧ ¬P +``` + +In total, this means that `T` is partitioned into the following six equivalence classes. + +``` + C ∧ P ∧ M + C ∧ P ∧ ¬M + C ∧ ¬P +¬C ∧ P ∧ M +¬C ∧ P ∧ ¬M +¬C ∧ ¬P +``` + +The user contract that the load.dimension.scd2 template defines is as follows: + +1. Non-current state that is not present in the source view (¬C ∧ ¬P) is retained in the target view. +2. Non-current state that is present in the source view (¬C ∧ P) is overridden in the target view. +3. Current state that is not present in the source view (C ∧ ¬P) is dropped from the target view. In other words, + all current state should be present in the source view in order to avoid data loss. + +The sample data loaded here defines entries for each non-empty class in the target relation. We load the expected +result of the application in a third relation `R`. The data in `R` and in the updated `T` relations should be the same +up to differences in the surrogate keys of the new items present in `S`. +""" +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{target_schema}`.`{target_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{target_schema}`.`{target_table}` ( + `{surrogate_key_column}` STRING, + `{id_column}` SMALLINT, + `{start_time_column}` TIMESTAMP, + `{end_time_column}` TIMESTAMP, + gender CHAR(1), + name STRING + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + REFRESH `{target_schema}`.`{target_table}` + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{target_schema}`.`{target_table}` VALUES ( + ("p10", 1, "1400-01-01", "9999-12-31", CAST("m" AS CHAR(1)), "Alfred Hitchcock" ), -- C ∧ P ∧ ¬M + + ("p20", 2, "1400-01-01", "2019-10-24", CAST("m" AS CHAR(1)), "ANDREI TARKOVSKY" ), -- ¬C ∧ ¬P + ("p21", 2, "2019-10-24", "9999-12-31", CAST("m" AS CHAR(1)), "Andrii Tarkowski" ), -- C ∧ P ∧ M + + ("p30", 3, "1400-01-01", "9999-12-31", CAST("m" AS CHAR(1)), "Ingmar Bergman" ), -- C ∧ ¬P + + ("p40", 4, "1400-01-01", "2009-01-01", CAST("m" AS CHAR(1)), "Laurence WACHOWSKI"), -- ¬C ∧ P ∧ M + ("p41", 4, "2009-01-01", "9999-12-31", CAST("m" AS CHAR(1)), "Lana Washowski" ), -- C ∧ P ∧ M + + ("p50", 5, "1400-01-01", "2016-03-01", CAST("m" AS CHAR(1)), "Andrew Wachowski" ), -- ¬C ∧ P ∧ ¬M + ("p51", 5, "2016-03-01", "9999-12-31", CAST("f" AS CHAR(1)), "Andrew Wachowski" ) -- C ∧ P ∧ ¬M + ) + """ + ) + job_input.execute_query( + """ + REFRESH `{target_schema}`.`{target_table}` + """ + ) + + # Step 2: create a table that represents the delta to be applied + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{source_schema}`.`{source_view}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{source_schema}`.`{source_view}` ( + `{id_column}` SMALLINT, + `{start_time_column}` TIMESTAMP, + `{end_time_column}` TIMESTAMP, + gender CHAR(1), + name STRING + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + REFRESH `{source_schema}`.`{source_view}` + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{source_schema}`.`{source_view}` VALUES ( + (1, "1400-01-01", "9999-12-31", CAST("m" AS CHAR(1)), "Alfred Hitchcock" ), -- p10: unmodified + + (2, "2019-10-24", "9999-12-31", CAST("m" AS CHAR(1)), "Andrei Tarkovsky" ), -- p21: fix typos in name + + (4, "1400-01-01", "2009-01-01", CAST("m" AS CHAR(1)), "Laurence Wachowski"), -- p40: fix case in name + (4, "2009-01-01", "2018-12-31", CAST("f" AS CHAR(1)), "Lana Washowski" ), -- p41: fix gender (closing) + (4, "2018-12-31", "9999-12-31", CAST("f" AS CHAR(1)), "Lana Wachowski" ), -- p42: change name (new) + + (5, "1400-01-01", "2016-03-01", CAST("m" AS CHAR(1)), "Andrew Wachowski" ), -- p50: unmodified + (5, "2016-03-01", "2018-12-31", CAST("f" AS CHAR(1)), "Andrew Wachowski" ), -- p51: unmodified (closing) + (5, "2018-12-31", "9999-12-31", CAST("f" AS CHAR(1)), "Lilly Wachowski" ) -- p52: change name (new) + ) + """ + ) + job_input.execute_query( + """ + REFRESH `{source_schema}`.`{source_view}` + """ + ) + + # Step 3: Create a table containing the state expected after updating the current state with the given delta + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{expect_schema}`.`{expect_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{expect_schema}`.`{expect_table}` ( + `{surrogate_key_column}` STRING, + `{id_column}` SMALLINT, + `{start_time_column}` TIMESTAMP, + `{end_time_column}` TIMESTAMP, + gender CHAR(1), + name STRING + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + REFRESH `{expect_schema}`.`{expect_table}` + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{expect_schema}`.`{expect_table}` VALUES ( + ("p10", 1, "1400-01-01", "9999-12-31", CAST("m" AS CHAR(1)), "Alfred Hitchcock" ), -- C ∧ P ∧ ¬M + + ("p20", 2, "1400-01-01", "2019-10-24", CAST("m" AS CHAR(1)), "ANDREI TARKOVSKY" ), -- ¬C ∧ ¬P + ("p21", 2, "2019-10-24", "9999-12-31", CAST("m" AS CHAR(1)), "Andrei Tarkovsky" ), -- C ∧ P ∧ M + + ("p40", 4, "1400-01-01", "2009-01-01", CAST("m" AS CHAR(1)), "Laurence Wachowski"), -- ¬C ∧ P ∧ M + ("p41", 4, "2009-01-01", "2018-12-31", CAST("f" AS CHAR(1)), "Lana Washowski" ), -- C ∧ P ∧ M + ("p42", 4, "2018-12-31", "9999-12-31", CAST("f" AS CHAR(1)), "Lana Wachowski" ), -- C ∧ P ∧ M (new) + + ("p50", 5, "1400-01-01", "2016-03-01", CAST("m" AS CHAR(1)), "Andrew Wachowski" ), -- ¬C ∧ P ∧ ¬M + ("p51", 5, "2016-03-01", "2018-12-31", CAST("f" AS CHAR(1)), "Andrew Wachowski" ), -- C ∧ P ∧ ¬M + ("p52", 5, "2018-12-31", "9999-12-31", CAST("f" AS CHAR(1)), "Lilly Wachowski" ) -- C ∧ P ∧ ¬M (new) + ) + """ + ) + job_input.execute_query( + """ + REFRESH `{expect_schema}`.`{expect_table}` + """ + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/02_run_load_dimension_scd2_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/02_run_load_dimension_scd2_template.py new file mode 100644 index 0000000000..a91e341f27 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/02_run_load_dimension_scd2_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/dimension/scd2", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_job/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_only/01_run_load_dimension_scd2_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_only/01_run_load_dimension_scd2_template.py new file mode 100644 index 0000000000..a91e341f27 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_only/01_run_load_dimension_scd2_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/dimension/scd2", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_only/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_only/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_dimension_scd2_template_only/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/01_prepare_input_data.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/01_prepare_input_data.py new file mode 100644 index 0000000000..d562001bee --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/01_prepare_input_data.py @@ -0,0 +1,128 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{target_schema}`.`{target_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{target_schema}`.`{target_table}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `cluster_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{target_schema}`.`{target_table}` VALUES ( + -- 2019-11-18 + ("sddc01-r01", 1, "2019-11-18", 5 , 1, "2019-11-18 09:00:00"), + ("sddc02-r01", 2, "2019-11-18", 4 , 1, "2019-11-18 09:00:00"), + ("sddc03-r01", 3, "2019-11-18", 12, 3, "2019-11-18 09:00:00"), + ("sddc04-r01", 4, "2019-11-18", 4 , 1, "2019-11-18 09:00:00"), + -- 2019-11-19 + ("sddc01-r01", 1, "2019-11-19", 5 , 1, "2019-11-19 09:00:00"), + ("sddc02-r01", 2, "2019-11-19", 4 , 1, "2019-11-19 09:00:00"), + ("sddc03-r01", 3, "2019-11-19", 13, 3, "2019-11-19 09:00:00"), + ("sddc04-r01", 4, "2019-11-19", 3 , 1, "2019-11-19 09:00:00"), + ("sddc05-r02", 5, "2019-11-19", 20, 4, "2019-11-19 09:00:00") + ) + """ + ) + + # Step 2: create a table that represents the next snapshot + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{source_schema}`.`{source_view}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{source_schema}`.`{source_view}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `cluster_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{source_schema}`.`{source_view}` VALUES ( + -- 2019-11-18 + ("sddc05-r01", 5, "2019-11-18", 18, 4, "2019-11-18 09:30:00"), -- late arrival + -- 2019-11-19 (duplicated) + ("sddc01-r01", 1, "2019-11-19", 5 , 1, "2019-11-19 09:00:00"), -- duplicated + ("sddc02-r01", 2, "2019-11-19", 4 , 1, "2019-11-19 09:00:00"), -- duplicated + ("sddc03-r01", 3, "2019-11-19", 13, 3, "2019-11-19 09:00:00"), -- duplicated + ("sddc04-r01", 4, "2019-11-19", 3 , 1, "2019-11-19 09:00:00"), -- duplicated + ("sddc05-r02", 5, "2019-11-19", 20, 5, "2019-11-19 09:00:00"), -- changed + -- 2019-11-20 + ("sddc01-r01", 1, "2019-11-20", 10, 2, "2019-11-20 09:00:00"), -- new + ("sddc02-r02", 2, "2019-11-20", 7 , 1, "2019-11-20 09:00:00"), -- new + ("sddc03-r01", 3, "2019-11-20", 13, 3, "2019-11-20 09:00:00"), -- new + ("sddc04-r01", 4, "2019-11-20", 3 , 1, "2019-11-20 09:00:00"), -- new + ("sddc05-r04", 5, "2019-11-20", 3 , 1, "2019-11-20 09:00:00"), -- new + ("sddc06-r01", 1, "2019-11-20", 3 , 1, "2019-11-20 09:00:00") -- new + ) + """ + ) + + # Step 3: Create a table containing the state expected after updating the current state with the next snapshot + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{expect_schema}`.`{expect_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{expect_schema}`.`{expect_table}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `cluster_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{expect_schema}`.`{expect_table}` VALUES ( + -- 2019-11-18 + ("sddc01-r01", 1, "2019-11-18", 5 , 1, "2019-11-18 09:00:00"), + ("sddc02-r01", 2, "2019-11-18", 4 , 1, "2019-11-18 09:00:00"), + ("sddc03-r01", 3, "2019-11-18", 12, 3, "2019-11-18 09:00:00"), + ("sddc04-r01", 4, "2019-11-18", 4 , 1, "2019-11-18 09:00:00"), + ("sddc05-r01", 5, "2019-11-18", 18, 4, "2019-11-18 09:30:00"), + -- 2019-11-19 (duplicated) + ("sddc01-r01", 1, "2019-11-19", 5 , 1, "2019-11-19 09:00:00"), + ("sddc02-r01", 2, "2019-11-19", 4 , 1, "2019-11-19 09:00:00"), + ("sddc03-r01", 3, "2019-11-19", 13, 3, "2019-11-19 09:00:00"), + ("sddc04-r01", 4, "2019-11-19", 3 , 1, "2019-11-19 09:00:00"), + ("sddc05-r02", 5, "2019-11-19", 20, 5, "2019-11-19 09:00:00"), + -- 2019-11-20 + ("sddc01-r01", 1, "2019-11-20", 10, 2, "2019-11-20 09:00:00"), + ("sddc02-r02", 2, "2019-11-20", 7 , 1, "2019-11-20 09:00:00"), + ("sddc03-r01", 3, "2019-11-20", 13, 3, "2019-11-20 09:00:00"), + ("sddc04-r01", 4, "2019-11-20", 3 , 1, "2019-11-20 09:00:00"), + ("sddc05-r04", 5, "2019-11-20", 3 , 1, "2019-11-20 09:00:00"), + ("sddc06-r01", 1, "2019-11-20", 3 , 1, "2019-11-20 09:00:00") + ) + """ + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/02_run_load_fact_snapshot_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/02_run_load_fact_snapshot_template.py new file mode 100644 index 0000000000..6d070d1dd0 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/02_run_load_fact_snapshot_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/fact/snapshot", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/01_prepare_input_data.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/01_prepare_input_data.py new file mode 100644 index 0000000000..9dc3d4f5bc --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/01_prepare_input_data.py @@ -0,0 +1,106 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{target_schema}`.`{target_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{target_schema}`.`{target_table}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `cluster_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{target_schema}`.`{target_table}` VALUES ( + -- 2019-11-18 + ("sddc01-r01", 1, "2019-11-18", 5 , 1, "2019-11-18 09:00:00"), + ("sddc02-r01", 2, "2019-11-18", 4 , 1, "2019-11-18 09:00:00"), + ("sddc03-r01", 3, "2019-11-18", 12, 3, "2019-11-18 09:00:00"), + ("sddc04-r01", 4, "2019-11-18", 4 , 1, "2019-11-18 09:00:00"), + -- 2019-11-19 + ("sddc01-r01", 1, "2019-11-19", 5 , 1, "2019-11-19 09:00:00"), + ("sddc02-r01", 2, "2019-11-19", 4 , 1, "2019-11-19 09:00:00"), + ("sddc03-r01", 3, "2019-11-19", 13, 3, "2019-11-19 09:00:00"), + ("sddc04-r01", 4, "2019-11-19", 3 , 1, "2019-11-19 09:00:00"), + ("sddc05-r02", 5, "2019-11-19", 20, 4, "2019-11-19 09:00:00") + ) + """ + ) + + # Step 2: create a table that represents the next snapshot + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{source_schema}`.`{source_view}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{source_schema}`.`{source_view}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `cluster_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP + ) STORED AS PARQUET + """ + ) + # We are testing the case when the next snapshot is empty + job_input.execute_query( + """ + TRUNCATE `{source_schema}`.`{source_view}` + """ + ) + + # Step 3: Create a table containing the state expected after updating the current state with the next snapshot + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{expect_schema}`.`{expect_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{expect_schema}`.`{expect_table}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `cluster_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{expect_schema}`.`{expect_table}` VALUES ( + -- 2019-11-18 + -- 2019-11-18 + ("sddc01-r01", 1, "2019-11-18", 5 , 1, "2019-11-18 09:00:00"), + ("sddc02-r01", 2, "2019-11-18", 4 , 1, "2019-11-18 09:00:00"), + ("sddc03-r01", 3, "2019-11-18", 12, 3, "2019-11-18 09:00:00"), + ("sddc04-r01", 4, "2019-11-18", 4 , 1, "2019-11-18 09:00:00"), + -- 2019-11-19 + ("sddc01-r01", 1, "2019-11-19", 5 , 1, "2019-11-19 09:00:00"), + ("sddc02-r01", 2, "2019-11-19", 4 , 1, "2019-11-19 09:00:00"), + ("sddc03-r01", 3, "2019-11-19", 13, 3, "2019-11-19 09:00:00"), + ("sddc04-r01", 4, "2019-11-19", 3 , 1, "2019-11-19 09:00:00"), + ("sddc05-r02", 5, "2019-11-19", 20, 4, "2019-11-19 09:00:00") + ) + """ + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/02_run_load_fact_snapshot_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/02_run_load_fact_snapshot_template.py new file mode 100644 index 0000000000..6d070d1dd0 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/02_run_load_fact_snapshot_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/fact/snapshot", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_job_empty_source/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_only/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_only/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_only/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_only/run_fact_snapshot_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_only/run_fact_snapshot_template.py new file mode 100644 index 0000000000..6d070d1dd0 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_only/run_fact_snapshot_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/fact/snapshot", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/01_prepare_input_data.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/01_prepare_input_data.py new file mode 100644 index 0000000000..6dfc8c32ee --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/01_prepare_input_data.py @@ -0,0 +1,132 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current стате + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{target_schema}`.`{target_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{target_schema}`.`{target_table}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP + ) PARTITIONED BY (`cluster_count` BIGINT) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + TRUNCATE `{target_schema}`.`{target_table}` + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{target_schema}`.`{target_table}` PARTITION (cluster_count) VALUES ( + -- 2019-11-18 + ("sddc01-r01", 1, "2019-11-18", 5 , "2019-11-18 09:00:00", 1), + ("sddc02-r01", 2, "2019-11-18", 4 , "2019-11-18 09:00:00", 1), + ("sddc03-r01", 3, "2019-11-18", 12, "2019-11-18 09:00:00", 3), + ("sddc04-r01", 4, "2019-11-18", 4 , "2019-11-18 09:00:00", 1), + -- 2019-11-19 + ("sddc01-r01", 1, "2019-11-19", 5 , "2019-11-19 09:00:00", 1), + ("sddc02-r01", 2, "2019-11-19", 4 , "2019-11-19 09:00:00", 1), + ("sddc03-r01", 3, "2019-11-19", 13, "2019-11-19 09:00:00", 3), + ("sddc04-r01", 4, "2019-11-19", 3 , "2019-11-19 09:00:00", 1), + ("sddc05-r02", 5, "2019-11-19", 20, "2019-11-19 09:00:00", 4) + ) + """ + ) + + # Step 2: create a table that represents the next snapshot + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{source_schema}`.`{source_view}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{source_schema}`.`{source_view}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP, + `cluster_count` BIGINT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{source_schema}`.`{source_view}` VALUES ( + -- 2019-11-18 + ("sddc05-r01", 5, "2019-11-18", 18, "2019-11-18 09:30:00", 4), -- late arrival + -- 2019-11-19 (duplicated) + ("sddc01-r01", 1, "2019-11-19", 5 , "2019-11-19 09:00:00", 1), -- duplicated + ("sddc02-r01", 2, "2019-11-19", 4 , "2019-11-19 09:00:00", 1), -- duplicated + ("sddc03-r01", 3, "2019-11-19", 13, "2019-11-19 09:00:00", 3), -- duplicated + ("sddc04-r01", 4, "2019-11-19", 3 , "2019-11-19 09:00:00", 1), -- duplicated + ("sddc05-r02", 5, "2019-11-19", 20, "2019-11-19 09:00:00", 5), -- changed + -- 2019-11-20 + ("sddc01-r01", 1, "2019-11-20", 10, "2019-11-20 09:00:00", 2), -- new + ("sddc02-r02", 2, "2019-11-20", 7 , "2019-11-20 09:00:00", 1), -- new + ("sddc03-r01", 3, "2019-11-20", 13, "2019-11-20 09:00:00", 3), -- new + ("sddc04-r01", 4, "2019-11-20", 3 , "2019-11-20 09:00:00", 1), -- new + ("sddc05-r04", 5, "2019-11-20", 3 , "2019-11-20 09:00:00", 1), -- new + ("sddc06-r01", 1, "2019-11-20", 3 , "2019-11-20 09:00:00", 1) -- new + ) + """ + ) + + # Step 3: Create a table containing the state expected after updating the current state with the next snapshot + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{expect_schema}`.`{expect_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{expect_schema}`.`{expect_table}` ( + `dim_sddc_sk` STRING, + `dim_org_id` INT, + `dim_date_id` TIMESTAMP, + `host_count` BIGINT, + `{last_arrival_ts}` TIMESTAMP, + `cluster_count` BIGINT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{expect_schema}`.`{expect_table}` VALUES ( + -- 2019-11-18 + ("sddc01-r01", 1, "2019-11-18", 5 , "2019-11-18 09:00:00", 1), + ("sddc02-r01", 2, "2019-11-18", 4 , "2019-11-18 09:00:00", 1), + ("sddc03-r01", 3, "2019-11-18", 12, "2019-11-18 09:00:00", 3), + ("sddc04-r01", 4, "2019-11-18", 4 , "2019-11-18 09:00:00", 1), + ("sddc05-r01", 5, "2019-11-18", 18, "2019-11-18 09:30:00", 4), + -- 2019-11-19 (duplicated) + ("sddc01-r01", 1, "2019-11-19", 5 , "2019-11-19 09:00:00", 1), + ("sddc02-r01", 2, "2019-11-19", 4 , "2019-11-19 09:00:00", 1), + ("sddc03-r01", 3, "2019-11-19", 13, "2019-11-19 09:00:00", 3), + ("sddc04-r01", 4, "2019-11-19", 3 , "2019-11-19 09:00:00", 1), + ("sddc05-r02", 5, "2019-11-19", 20, "2019-11-19 09:00:00", 5), + -- 2019-11-20 + ("sddc01-r01", 1, "2019-11-20", 10, "2019-11-20 09:00:00", 2), + ("sddc02-r02", 2, "2019-11-20", 7 , "2019-11-20 09:00:00", 1), + ("sddc03-r01", 3, "2019-11-20", 13, "2019-11-20 09:00:00", 3), + ("sddc04-r01", 4, "2019-11-20", 3 , "2019-11-20 09:00:00", 1), + ("sddc05-r04", 5, "2019-11-20", 3 , "2019-11-20 09:00:00", 1), + ("sddc06-r01", 1, "2019-11-20", 3 , "2019-11-20 09:00:00", 1) + ) + """ + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/02_run_load_fact_snapshot_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/02_run_load_fact_snapshot_template.py new file mode 100644 index 0000000000..6d070d1dd0 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/02_run_load_fact_snapshot_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/fact/snapshot", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_fact_snapshot_template_partition_job/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/01_prepare_input_data.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/01_prepare_input_data.py new file mode 100644 index 0000000000..f396876046 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/01_prepare_input_data.py @@ -0,0 +1,120 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Load example input data for an scd2 template test. +""" +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{target_schema}`.`{target_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{target_schema}`.`{target_table}` ( + `{sk_column}` STRING, + `{active_from_column}` TIMESTAMP, + `{active_to_column}` TIMESTAMP, + `{id_column}` INT, + `updated_by_user_id` INT, + `state` STRING, + `is_nsxt` BOOLEAN, + `cloud_vendor` STRING, + `version` SMALLINT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{target_schema}`.`{target_table}` VALUES ( + ("sddc01-v01", "2019-01-01", "9999-12-31", 1, 7, "RUNNING" , false, 'Azure', 498), + ("sddc02-v01", "2019-02-01", "9999-12-31", 2, 9, "STOPPED" , false, 'AWS' , 500), + ("sddc03-v01", "2019-03-01", "9999-12-31", 3, 3, "PROVISIONING", false, 'Azure', 497), + ("sddc04-v01", "2019-04-01", "9999-12-31", 4, 5, "PROVISIONING", true , 'Azure', 498), + ("sddc05-v01", "2019-05-01", "2019-05-02", 5, 9, "STARTING" , true , 'AWS' , 500), + ("sddc05-v02", "2019-05-02", "2019-05-03", 5, 2, "STARTING" , true , 'AWS' , 500), + ("sddc05-v03", "2019-05-03", "9999-12-31", 5, 3, "STARTING" , true , 'AWS' , 500) + ) + """ + ) + + # Step 2: create a table that represents the delta to be applied + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{source_schema}`.`{source_view}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{source_schema}`.`{source_view}` ( + `{updated_at_column}` TIMESTAMP, + `{id_column}` INT, + `updated_by_user_id` INT, + `state` STRING, + `is_nsxt` BOOLEAN, + `cloud_vendor` STRING, + `version` SMALLINT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{source_schema}`.`{source_view}` VALUES ( + ("2019-02-02", 2, 1, "STARTING" , false, 'AWS' , 500), -- Update (1) - new time, new values + ("2019-03-01", 3, 4, "RUNNING" , false, 'Azure', 497), -- Update (2) - same time, new values + ("2019-04-02", 4, 5, "PROVISIONING", true , 'Azure', 498), -- Update (3) - new time, same values + ("2019-05-01", 5, 9, "STARTING" , true , 'AWS' , 500), -- Update (4) - same time, same values + ("2019-05-02", 5, 9, "STARTING" , true , 'AWS' , 500), -- Update (5) - same time, prev values + ("2019-05-04", 5, 9, "STARTING" , true , 'AWS' , 500), -- Update (1) - new time, new values + ("2019-06-01", 6, 9, "STARTING" , true , 'AWS' , 499) -- Insert + ) + """ + ) + + # Step 3: Create a table containing the state expected after updating the current state with the given delta + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{expect_schema}`.`{expect_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{expect_schema}`.`{expect_table}` ( + `{sk_column}` STRING, + `{active_from_column}` TIMESTAMP, + `{active_to_column}` TIMESTAMP, + `{id_column}` INT, + `updated_by_user_id` INT, + `state` STRING, + `is_nsxt` BOOLEAN, + `cloud_vendor` STRING, + `version` SMALLINT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{expect_schema}`.`{expect_table}` VALUES ( + ("sddc01-v01", "2019-01-01", "9999-12-31", 1, 7, "RUNNING" , false, 'Azure', 498), + + ("sddc02-v01", "2019-02-01", "2019-02-02", 2, 9, "STOPPED" , false, 'AWS' , 500), + ("sddc02-v02", "2019-02-02", "9999-12-31", 2, 1, "STARTING" , false, 'AWS' , 500), + + ("sddc03-v01", "2019-03-01", "9999-12-31", 3, 4, "RUNNING" , false, 'Azure', 497), + + ("sddc04-v01", "2019-04-01", "9999-12-31", 4, 5, "PROVISIONING", true , 'Azure', 498), + + ("sddc05-v01", "2019-05-01", "2019-05-03", 5, 9, "STARTING" , true , 'AWS' , 500), + ("sddc05-v03", "2019-05-03", "2019-05-04", 5, 3, "STARTING" , true , 'AWS' , 500), + ("sddc05-v04", "2019-05-04", "9999-12-31", 5, 9, "STARTING" , true , 'AWS' , 500), + + ("sddc06-v01", "2019-06-01", "9999-12-31", 6, 9, "STARTING" , true , 'AWS' , 499) + ) + """ + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/02_run_load_versioned_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/02_run_load_versioned_template.py new file mode 100644 index 0000000000..e5e13a815e --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/02_run_load_versioned_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/versioned", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_job/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_only/01_run_versioned_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_only/01_run_versioned_template.py new file mode 100644 index 0000000000..e5e13a815e --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_only/01_run_versioned_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/versioned", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_only/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_only/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_only/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/01_prepare_input_data.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/01_prepare_input_data.py new file mode 100644 index 0000000000..b6bc6bdcc3 --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/01_prepare_input_data.py @@ -0,0 +1,124 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Load example input data for an scd2 template test. +""" +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + # Step 1: create a table that represents the current state + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{target_schema}`.`{target_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{target_schema}`.`{target_table}` ( + `{sk_column}` STRING, + `{active_from_column}` TIMESTAMP, + `{active_to_column}` TIMESTAMP, + `{id_column}` INT, + `updated_by_user_id` INT, + `state` STRING, + `is_nsxt` BOOLEAN, + `cloud_vendor` STRING + ) PARTITIONED BY (`version` SMALLINT) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + TRUNCATE `{target_schema}`.`{target_table}` + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{target_schema}`.`{target_table}` PARTITION (version) VALUES ( + ("sddc01-v01", "2019-01-01", "9999-12-31", 1, 7, "RUNNING" , false, 'Azure', 498), + ("sddc02-v01", "2019-02-01", "9999-12-31", 2, 9, "STOPPED" , false, 'AWS' , 500), + ("sddc03-v01", "2019-03-01", "9999-12-31", 3, 3, "PROVISIONING", false, 'Azure', 497), + ("sddc04-v01", "2019-04-01", "9999-12-31", 4, 5, "PROVISIONING", true , 'Azure', 498), + ("sddc05-v01", "2019-05-01", "2019-05-02", 5, 9, "STARTING" , true , 'AWS' , 500), + ("sddc05-v02", "2019-05-02", "2019-05-03", 5, 2, "STARTING" , true , 'AWS' , 500), + ("sddc05-v03", "2019-05-03", "9999-12-31", 5, 3, "STARTING" , true , 'AWS' , 500) + ) + """ + ) + + # Step 2: create a table that represents the delta to be applied + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{source_schema}`.`{source_view}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{source_schema}`.`{source_view}` ( + `{updated_at_column}` TIMESTAMP, + `{id_column}` INT, + `updated_by_user_id` INT, + `state` STRING, + `is_nsxt` BOOLEAN, + `cloud_vendor` STRING, + `version` SMALLINT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{source_schema}`.`{source_view}` VALUES ( + ("2019-02-02", 2, 1, "STARTING" , false, 'AWS' , 500), -- Update (1) - new time, new values + ("2019-03-01", 3, 4, "RUNNING" , false, 'Azure', 497), -- Update (2) - same time, new values + ("2019-04-02", 4, 5, "PROVISIONING", true , 'Azure', 498), -- Update (3) - new time, same values + ("2019-05-01", 5, 9, "STARTING" , true , 'AWS' , 500), -- Update (4) - same time, same values + ("2019-05-02", 5, 9, "STARTING" , true , 'AWS' , 500), -- Update (5) - same time, prev values + ("2019-05-04", 5, 9, "STARTING" , true , 'AWS' , 500), -- Update (1) - new time, new values + ("2019-06-01", 6, 9, "STARTING" , true , 'AWS' , 499) -- Insert + ) + """ + ) + + # Step 3: Create a table containing the state expected after updating the current state with the given delta + + # job_input.execute_query(u''' + # DROP TABLE IF EXISTS `{expect_schema}`.`{expect_table}` + # ''') + job_input.execute_query( + """ + CREATE TABLE IF NOT EXISTS `{expect_schema}`.`{expect_table}` ( + `{sk_column}` STRING, + `{active_from_column}` TIMESTAMP, + `{active_to_column}` TIMESTAMP, + `{id_column}` INT, + `updated_by_user_id` INT, + `state` STRING, + `is_nsxt` BOOLEAN, + `cloud_vendor` STRING, + `version` SMALLINT + ) STORED AS PARQUET + """ + ) + job_input.execute_query( + """ + INSERT OVERWRITE TABLE `{expect_schema}`.`{expect_table}` VALUES ( + ("sddc01-v01", "2019-01-01", "9999-12-31", 1, 7, "RUNNING" , false, 'Azure', 498), + + ("sddc02-v01", "2019-02-01", "2019-02-02", 2, 9, "STOPPED" , false, 'AWS' , 500), + ("sddc02-v02", "2019-02-02", "9999-12-31", 2, 1, "STARTING" , false, 'AWS' , 500), + + ("sddc03-v01", "2019-03-01", "9999-12-31", 3, 4, "RUNNING" , false, 'Azure', 497), + + ("sddc04-v01", "2019-04-01", "9999-12-31", 4, 5, "PROVISIONING", true , 'Azure', 498), + + ("sddc05-v01", "2019-05-01", "2019-05-03", 5, 9, "STARTING" , true , 'AWS' , 500), + ("sddc05-v03", "2019-05-03", "2019-05-04", 5, 3, "STARTING" , true , 'AWS' , 500), + ("sddc05-v04", "2019-05-04", "9999-12-31", 5, 9, "STARTING" , true , 'AWS' , 500), + + ("sddc06-v01", "2019-06-01", "9999-12-31", 6, 9, "STARTING" , true , 'AWS' , 499) + ) + """ + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/02_run_load_versioned_template.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/02_run_load_versioned_template.py new file mode 100644 index 0000000000..e5e13a815e --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/02_run_load_versioned_template.py @@ -0,0 +1,15 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from vdk.api.job_input import IJobInput + +__author__ = "VMware, Inc." +__copyright__ = ( + "Copyright 2019 VMware, Inc. All rights reserved. -- VMware Confidential" +) + + +def run(job_input: IJobInput) -> None: + job_input.execute_template( + template_name="load/versioned", + template_args=job_input.get_arguments(), + ) diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/__init__.py b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/__init__.py new file mode 100644 index 0000000000..50c007580a --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/jobs/load_versioned_template_partition_job/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 diff --git a/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py new file mode 100644 index 0000000000..0d4493ff9c --- /dev/null +++ b/projects/vdk-plugins/vdk-impala/tests/functional/template_regression_test.py @@ -0,0 +1,603 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import json +import os +import pathlib +import re +import time +import unittest +from unittest.mock import ANY +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from vdk.internal.core import errors +from vdk.plugin.impala import impala_plugin +from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner +from vdk.plugin.test_utils.util_funcs import get_test_job_path + +VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE" +VDK_IMPALA_HOST = "VDK_IMPALA_HOST" +VDK_IMPALA_PORT = "VDK_IMPALA_PORT" + + +@patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "IMPALA", + VDK_IMPALA_HOST: "localhost", + VDK_IMPALA_PORT: "21050", + }, +) +@pytest.mark.usefixtures("impala_service") +class TemplateRegressionTests(unittest.TestCase): + def setUp(self) -> None: + self.__runner = CliEntryBasedTestRunner(impala_plugin) + time.sleep(10) # wait for impala instance to come online + self._run_query("CREATE DATABASE IF NOT EXISTS vdkprototypes") + + def test_load_dimension_scd1(self) -> None: + test_schema = "vdkprototypes" + source_view = "vw_dim_org" + target_table = "dw_dim_org" + + res = self._run_job( + "load_dimension_scd1_template_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + }, + ) + assert not res.exception + actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}") + expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{source_view}") + assert actual_rs.output and expected_rs.output + assert actual_rs.output == expected_rs.output + + def test_load_dimension_scd1_partitioned(self) -> None: + test_schema = "vdkprototypes" + source_view = "vw_dim_org_partition_test" + target_table = "dw_dim_org_partitioned" + + res = self._run_job( + "load_dimension_scd1_template_partition_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + }, + ) + assert not res.exception + + actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}") + expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{source_view}") + + actual = {x for x in actual_rs.output.split("\n")} + expected = {x for x in expected_rs.output.split("\n")} + + assert actual_rs.output and expected_rs.output + self.assertSetEqual( + expected, actual, f"Elements in {source_view} and {target_table} differ." + ) + + def test_load_dimension_scd1_parameter_validation(self) -> None: + self._run_template_with_bad_arguments( + template_name="load_dimension_scd1_template_only", + template_args={}, + num_exp_errors=4, + ) + self._run_template_with_bad_arguments( + template_name="load_dimension_scd1_template_only", + template_args={"source_view": "foo", "extra_parameter": "bar"}, + num_exp_errors=3, + ) + + def test_load_dimension_scd1_bad_target_schema(self) -> None: + template_args = { + "source_schema": "vdkprototypes", + "source_view": "vw_dim_org", + "target_schema": "vdkprototypes", + "target_table": "dw_dim_org_as_textfile", + } + + self._run_template_with_bad_target_schema( + template_name="load_dimension_scd1_template_only", + template_args=template_args, + ) + + def test_load_dimension_scd2(self) -> None: + test_schema = "vdkprototypes" + source_view = "vw_scmdb_people" + target_table = "dw_scmdb_people" + expect_table = "ex_scmdb_people" + + res = self._run_job( + "load_dimension_scd2_template_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "staging_schema": test_schema, + "expect_schema": test_schema, + "expect_table": expect_table, + "start_time_column": "start_time", + "end_time_column": "end_time", + "end_time_default_value": "9999-12-31", + "surrogate_key_column": "sk", + "id_column": "id", + }, + ) + assert not res.exception + + actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}") + expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}") + # delete first (surrogate key) column from the two results, as those are uniquely generated and might differ + actual = {x[38:] for x in actual_rs.output.split("\n")} + expected = {x[5:] for x in expected_rs.output.split("\n")} + assert actual_rs.output and expected_rs.output + self.assertSetEqual( + expected, actual, f"Elements in {expect_table} and {target_table} differ." + ) + + def test_load_dimension_scd2_parameter_validation(self) -> None: + self._run_template_with_bad_arguments( + template_name="load_dimension_scd2_template_only", + template_args={}, + num_exp_errors=9, + ) + self._run_template_with_bad_arguments( + template_name="load_dimension_scd2_template_only", + template_args={"source_view": "foo", "extra_parameter": "bar"}, + num_exp_errors=8, + ) + + def test_load_dimension_scd2_bad_target_schema(self) -> None: + template_args = { + "source_schema": "vdkprototypes", + "source_view": "vw_scmdb_people", + "target_schema": "vdkprototypes", + "target_table": "dw_fact_sddc_daily_as_textfile", + "staging_schema": "vdkprototypes", + "start_time_column": "start_time", + "end_time_column": "end_time", + "end_time_default_value": "9999-12-31", + "surrogate_key_column": "sk", + "id_column": "id", + } + + self._run_template_with_bad_target_schema( + template_name="load_dimension_scd2_template_only", + template_args=template_args, + ) + + def test_load_versioned(self) -> None: + test_schema = "vdkprototypes" + source_view = "vw_sddc_h_updates" + target_table = "dim_sddc_h" + expect_table = "ex_dim_sddc_h" + + res = self._run_job( + "load_versioned_template_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "id_column": "sddc_id", + "sk_column": "sddc_sk", + "value_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "cloud_vendor", + "version", + ], + "tracked_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "version", + ], + "active_from_column": "active_from", + "active_to_column": "active_to", + "active_to_max_value": "9999-12-31", + "updated_at_column": "updated_at", + }, + ) + assert not res.exception + + actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}") + expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}") + assert actual_rs.output and expected_rs.output + # delete first (surrogate key) column from the two results, as those are uniquely generated and might differ + actual = {x[38:] for x in actual_rs.output.split("\n")} + expected = {x[12:] for x in expected_rs.output.split("\n")} + + self.assertSetEqual( + actual, expected, f"Elements in {expect_table} and {target_table} differ." + ) + + def test_load_versioned_partitioned(self) -> None: + test_schema = "vdkprototypes" + source_view = "vw_sddc_h_updates_partition_test" + target_table = "dim_sddc_h_partitioned" + expect_table = "ex_dim_sddc_h_partition_test" + + res = self._run_job( + "load_versioned_template_partition_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "id_column": "sddc_id", + "sk_column": "sddc_sk", + "value_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "cloud_vendor", + "version", + ], + "tracked_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "version", + ], + "active_from_column": "active_from", + "active_to_column": "active_to", + "active_to_max_value": "9999-12-31", + "updated_at_column": "updated_at", + }, + ) + assert not res.exception + + actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}") + expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}") + assert actual_rs.output and expected_rs.output + # delete first (surrogate key) column from the two results, as those are uniquely generated and might differ + actual = {x[38:] for x in actual_rs.output.split("\n")} + expected = {x[12:] for x in expected_rs.output.split("\n")} + + self.assertSetEqual( + actual, expected, f"Elements in {expect_table} and {target_table} differ." + ) + + def test_load_versioned_parameter_validation(self) -> None: + self._run_template_with_bad_arguments( + template_name="load_versioned_template_only", + template_args={}, + num_exp_errors=7, + ) + + good_template_args = { + "source_schema": "vdkprototypes", + "source_view": "vw_sddc_h_updates", + "target_schema": "vdkprototypes", + "target_table": "dim_sddc_h_as_textfile", + "id_column": "sddc_id", + "sk_column": "sddc_sk", + "value_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "cloud_vendor", + "version", + ], + "tracked_columns": ["updated_by_user_id", "state", "is_nsxt", "version"], + "active_from_column": "active_from", + "active_to_column": "active_to", + "active_to_max_value": "9999-12-31", + "updated_at_column": "updated_at", + "extra_parameter": "bar", + } + + self._run_template_with_bad_arguments( + template_name="load_versioned_template_only", + template_args={ + **good_template_args, + **{ + "value_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "cloud_vendor", + ], + "tracked_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "version", + ], + }, + }, + num_exp_errors=1, + ) + + self._run_template_with_bad_arguments( + template_name="load_versioned_template_only", + template_args={ + **good_template_args, + **{ + "value_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "cloud_vendor", + ], + "tracked_columns": [], + }, + }, + num_exp_errors=1, + ) + + def test_load_versioned_bad_target_schema(self) -> None: + template_args = { + "source_schema": "vdkprototypes", + "source_view": "vw_sddc_h_updates", + "target_schema": "vdkprototypes", + "target_table": "dim_sddc_h_as_textfile", + "id_column": "sddc_id", + "sk_column": "sddc_sk", + "value_columns": [ + "updated_by_user_id", + "state", + "is_nsxt", + "cloud_vendor", + "version", + ], + "tracked_columns": ["updated_by_user_id", "state", "is_nsxt", "version"], + "active_from_column": "active_from", + "active_to_column": "active_to", + "active_to_max_value": "9999-12-31", + "updated_at_column": "updated_at", + } + + self._run_template_with_bad_target_schema( + template_name="load_versioned_template_only", + template_args=template_args, + ) + + def test_load_fact_snapshot(self) -> None: + test_schema = "vdkprototypes" + source_view = "vw_fact_sddc_daily" + target_table = "dw_fact_sddc_daily" + expect_table = "ex_fact_sddc_daily" + + res = self._run_job( + "load_fact_snapshot_template_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "last_arrival_ts": "updated_at", + }, + ) + assert not res.exception + + actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}") + expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}") + assert actual_rs.output and expected_rs.output + + actual = {x for x in actual_rs.output.split("\n")} + expected = {x for x in expected_rs.output.split("\n")} + + self.assertSetEqual( + actual, expected, f"Elements in {expect_table} and {target_table} differ." + ) + + def test_load_fact_snapshot_empty_source(self) -> None: + test_schema = "vdkprototypes" + source_view = "vw_fact_sddc_daily_empty_source" + target_table = "dw_fact_sddc_daily_empty_source" + expect_table = "ex_fact_sddc_daily_empty_source" + + res = self._run_job( + "load_fact_snapshot_template_job_empty_source", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "last_arrival_ts": "updated_at", + }, + ) + # Expecting data job not to finish due to empty source. + assert res.exception + assert "Source view returns no results" in res.exception.args[0] + + actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}") + expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}") + assert actual_rs.output and expected_rs.output + + actual = {x for x in actual_rs.output.split("\n")} + expected = {x for x in expected_rs.output.split("\n")} + + self.assertSetEqual( + actual, expected, f"Elements in {expect_table} and {target_table} differ." + ) + + def test_load_fact_snapshot_partition(self) -> None: + test_schema = "vdkprototypes" + source_view = "vw_fact_sddc_daily_partition" + target_table = "dw_fact_sddc_daily_partition" + expect_table = "ex_fact_sddc_daily_partition" + + res = self._run_job( + "load_fact_snapshot_template_partition_job", + { + "source_schema": test_schema, + "source_view": source_view, + "target_schema": test_schema, + "target_table": target_table, + "expect_schema": test_schema, + "expect_table": expect_table, + "last_arrival_ts": "updated_at", + }, + ) + assert not res.exception + + actual_rs = self._run_query(f"SELECT * FROM {test_schema}.{target_table}") + expected_rs = self._run_query(f"SELECT * FROM {test_schema}.{expect_table}") + assert actual_rs.output and expected_rs.output + + actual = {x for x in actual_rs.output.split("\n")} + expected = {x for x in expected_rs.output.split("\n")} + + self.assertSetEqual( + actual, expected, f"Elements in {expect_table} and {target_table} differ." + ) + + def test_load_fact_snapshot_parameter_validation(self) -> None: + self._run_template_with_bad_arguments( + template_name="load_fact_snapshot_template_only", + template_args={}, + num_exp_errors=5, + ) + self._run_template_with_bad_arguments( + template_name="load_fact_snapshot_template_only", + template_args={"source_view": "foo", "target_table": None}, + num_exp_errors=4, + ) + + def test_load_fact_snapshot_bad_target_schema(self) -> None: + template_args = { + "source_schema": "vdkprototypes", + "source_view": "vw_fact_sddc_daily", + "target_schema": "vdkprototypes", + "target_table": "dw_fact_sddc_daily_as_textfile", + "expect_schema": "vdkprototypes", + "last_arrival_ts": "updated_at", + } + + self._run_template_with_bad_target_schema( + template_name="load_fact_snapshot_template_only", + template_args=template_args, + ) + + def _run_job(self, job_name: str, args: dict): + return self.__runner.invoke( + [ + "run", + get_test_job_path( + pathlib.Path(os.path.dirname(os.path.abspath(__file__))), + job_name, + ), + "--arguments", + json.dumps(args), + ] + ) + + def _run_query(self, query_string): + return self.__runner.invoke( + [ + "impala-query", + "--query", + query_string, + ] + ) + + def _run_template_with_bad_arguments( + self, template_name: str, template_args: dict, num_exp_errors: int + ) -> None: + + expected_error_regex = re.escape( + f'{num_exp_errors} validation {"errors" if num_exp_errors > 1 else "error"} ' + f"for {template_name} template" + ) + + def just_rethrow(*_, **kwargs): + raise Exception(expected_error_regex) + + with patch.object(errors, "log_and_rethrow") as patched_log_and_rethrow: + patched_log_and_rethrow.side_effect = just_rethrow + result = self._run_job(template_name, template_args) + assert expected_error_regex in result.output + assert ( + errors.log_and_rethrow.call_args[1]["what_happened"] + == "Template execution in Data Job finished with error" + ) + assert errors.log_and_rethrow.call_args[1]["why_it_happened"].startswith( + f"An exception occurred, exception message was: {num_exp_errors} validation error" + ) + assert ( + errors.log_and_rethrow.call_args[1]["consequences"] + == errors.MSG_CONSEQUENCE_TERMINATING_APP + ) + assert ( + errors.MSG_COUNTERMEASURE_FIX_PARENT_EXCEPTION + == errors.log_and_rethrow.call_args[1]["countermeasures"] + ) + + def _run_template_with_bad_target_schema( + self, template_name: str, template_args: dict + ) -> None: + self._run_query( + """ + DROP TABLE IF EXISTS {target_schema}.{target_table} + """.format( + **template_args + ) + ) + self._run_query( + """ + CREATE TABLE {target_schema}.{target_table} ( + attr_a INT, + attr_b STRING, + updated_at TIMESTAMP + ) STORED AS TEXTFILE + """.format( + **template_args + ) + ) + self._run_query( + """ + REFRESH {target_schema}.{target_table} + """.format( + **template_args + ) + ) + + table_name = "`{target_schema}`.`{target_table}`".format(**template_args) + + expected_why_it_happened_msg = ( + f'The target table {table_name} must be created with a "STORED AS PARQUET" ' + f"clause. Please change the table definition accordingly and re-create the table." + ) + + def just_throw(*_, **kwargs): + raise Exception(expected_why_it_happened_msg) + + errors.log_and_throw = MagicMock(side_effect=just_throw) + + res = self._run_job(template_name, template_args) + assert expected_why_it_happened_msg in res.output + errors.log_and_throw.assert_called_once_with( + to_be_fixed_by=errors.ResolvableBy.USER_ERROR, + log=ANY, + what_happened="Data loading has failed.", + why_it_happened=( + f"You are trying to load data into a table {table_name} with an unsupported format. " + f"Currently only Parquet table format is supported." + ), + consequences="Data load will be aborted.", + countermeasures=( + "Make sure that the destination table is stored as parquet: " + "https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_parquet.html" + "#parquet_ddl" + ), + )