From 15a119c5f64064d4b36713b34938c6ffe5901f32 Mon Sep 17 00:00:00 2001 From: Philip Alexiev Date: Tue, 15 Mar 2022 16:48:36 +0200 Subject: [PATCH] vdk-trino: collect lineage for select/insert and rename table only (#756) * vdk-trino: collect lineage for select/insert and rename table only Why: To make lineage collecting more production ready, some improvements are needed. What: In order to reduce the load on the query engine, only plans for insert/select queries are calculated. For rename table queries, the plan doesn't give information. The query is parsed and table names extracted. Counting the number of rows in the output table before and after is removed to reduce the burden on the query engine. How has this been tested: Tweaked the test_vdk_trino_lineage.py test to be more comprehensive and cover all scenarios. What type of change are you making? Bug fix (non-breaking change which fixes an issue) or a cosmetic change/minor improvement Signed-off-by: Philip Alexiev (palexiev@vmware.com) * vdk-trino: collect lineage for select/insert and rename table only Why: To make lineage collecting more production ready, some improvements are needed. What: In order to reduce the load on the query engine, only plans for insert/select queries are calculated. For rename table queries, the plan doesn't give information. The query is parsed and table names extracted. Counting the number of rows in the output table before and after is removed to reduce the burden on the query engine. How has this been tested: Tweaked the test_vdk_trino_lineage.py test to be more comprehensive and cover all scenarios. What type of change are you making? Bug fix (non-breaking change which fixes an issue) or a cosmetic change/minor improvement Signed-off-by: Philip Alexiev (palexiev@vmware.com) * vdk-trino: collect lineage for select/insert and rename table only Why: To make lineage collecting more production ready, some improvements are needed. What: In order to reduce the load on the query engine, only plans for insert/select queries are calculated. For rename table queries, the plan doesn't give information. The query is parsed and table names extracted. Counting the number of rows in the output table before and after is removed to reduce the burden on the query engine. How has this been tested: Tweaked the test_vdk_trino_lineage.py test to be more comprehensive and cover all scenarios. What type of change are you making? Bug fix (non-breaking change which fixes an issue) or a cosmetic change/minor improvement Signed-off-by: Philip Alexiev (palexiev@vmware.com) * vdk-trino: collect lineage for select/insert and rename table only Why: To make lineage collecting more production ready, some improvements are needed. What: In order to reduce the load on the query engine, only plans for insert/select queries are calculated. For rename table queries, the plan doesn't give information. The query is parsed and table names extracted. Counting the number of rows in the output table before and after is removed to reduce the burden on the query engine. How has this been tested: Tweaked the test_vdk_trino_lineage.py test to be more comprehensive and cover all scenarios. What type of change are you making? Bug fix (non-breaking change which fixes an issue) or a cosmetic change/minor improvement Signed-off-by: Philip Alexiev (palexiev@vmware.com) * vdk-trino: collect lineage for select/insert and rename table only Why: To make lineage collecting more production ready, some improvements are needed. What: In order to reduce the load on the query engine, only plans for insert/select queries are calculated. For rename table queries, the plan doesn't give information. The query is parsed and table names extracted. Counting the number of rows in the output table before and after is removed to reduce the burden on the query engine. How has this been tested: Tweaked the test_vdk_trino_lineage.py test to be more comprehensive and cover all scenarios. What type of change are you making? Bug fix (non-breaking change which fixes an issue) or a cosmetic change/minor improvement Signed-off-by: Philip Alexiev (palexiev@vmware.com) * vdk-trino: collect lineage for select/insert and rename table only Why: To make lineage collecting more production ready, some improvements are needed. What: In order to reduce the load on the query engine, only plans for insert/select queries are calculated. For rename table queries, the plan doesn't give information. The query is parsed and table names extracted. Counting the number of rows in the output table before and after is removed to reduce the burden on the query engine. How has this been tested: Tweaked the test_vdk_trino_lineage.py test to be more comprehensive and cover all scenarios. What type of change are you making? Bug fix (non-breaking change which fixes an issue) or a cosmetic change/minor improvement Signed-off-by: Philip Alexiev (palexiev@vmware.com) * vdk-trino: collect lineage for select/insert and rename table only Why: To make lineage collecting more production ready, some improvements are needed. What: In order to reduce the load on the query engine, only plans for insert/select queries are calculated. For rename table queries, the plan doesn't give information. The query is parsed and table names extracted. Counting the number of rows in the output table before and after is removed to reduce the burden on the query engine. How has this been tested: Tweaked the test_vdk_trino_lineage.py test to be more comprehensive and cover all scenarios. What type of change are you making? Bug fix (non-breaking change which fixes an issue) or a cosmetic change/minor improvement Signed-off-by: Philip Alexiev (palexiev@vmware.com) * vdk-trino: collect lineage for select/insert and rename table only Why: To make lineage collecting more production ready, some improvements are needed. What: In order to reduce the load on the query engine, only plans for insert/select queries are calculated. For rename table queries, the plan doesn't give information. The query is parsed and table names extracted. Counting the number of rows in the output table before and after is removed to reduce the burden on the query engine. How has this been tested: Tweaked the test_vdk_trino_lineage.py test to be more comprehensive and cover all scenarios. What type of change are you making? Bug fix (non-breaking change which fixes an issue) or a cosmetic change/minor improvement Signed-off-by: Philip Alexiev (palexiev@vmware.com) --- projects/vdk-plugins/vdk-trino/setup.py | 2 +- .../vdk-trino/src/vdk/plugin/trino/lineage.py | 74 +++- .../src/vdk/plugin/trino/lineage_utils.py | 130 +++++++ .../src/vdk/plugin/trino/trino_connection.py | 105 ++---- .../vdk-trino/tests/test_vdk_trino_lineage.py | 347 +++++++++++++++++- .../tests/test_vdk_trino_lineage_utils.py | 208 +++++++++++ 6 files changed, 771 insertions(+), 95 deletions(-) create mode 100644 projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage_utils.py create mode 100644 projects/vdk-plugins/vdk-trino/tests/test_vdk_trino_lineage_utils.py diff --git a/projects/vdk-plugins/vdk-trino/setup.py b/projects/vdk-plugins/vdk-trino/setup.py index de52a01c64..0418ecc0e7 100644 --- a/projects/vdk-plugins/vdk-trino/setup.py +++ b/projects/vdk-plugins/vdk-trino/setup.py @@ -14,7 +14,7 @@ description="Versatile Data Kit SDK plugin provides support for trino database and trino transformation templates.", long_description=pathlib.Path("README.md").read_text(), long_description_content_type="text/markdown", - install_requires=["vdk-core", "trino", "tabulate"], + install_requires=["vdk-core", "trino", "tabulate", "sqlparse"], package_dir={"": "src"}, packages=setuptools.find_namespace_packages(where="src"), include_package_data=True, diff --git a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage.py b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage.py index 2e458d3835..3869aad48b 100644 --- a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage.py +++ b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage.py @@ -1,8 +1,74 @@ # Copyright 2021 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 from abc import ABC -from typing import Any -from typing import Dict +from typing import List + + +class LineageTable: + """ + Defines the LineageTable object contract + + Attributes + catalog: str + A string holding the catalog where the table lives + schema: str + A string holding the schema where the table lives + table: str + A string holding the table name + """ + + catalog: str + schema: str + table: str + + def __init__(self, catalog: str, schema: str, table: str): + self.catalog = catalog + self.schema = schema + self.table = table + + +class LineageData: + """ + Defines the LineageData contract + + Attributes + query: str + The original query + query_type: str + Type of operation (see below for supported values) + query_status: str + 'OK' or 'EXCEPTION' + input_tables: List[LineageTable] + An array of LineageTable objects (see LineageTable) + output_table: LineageTable + A LineageTable object (see LineageTable) + + Supported query_type values are: + - insert + - select + - insert_select + - rename_table + """ + + query: str + query_type: str + query_status: str + input_tables: List[LineageTable] + output_table: LineageTable + + def __init__( + self, + query: str, + query_type: str, + query_status: str, + input_tables: List[LineageTable], + output_table: LineageTable, + ): + self.query = query + self.query_type = query_type + self.query_status = query_status + self.input_tables = input_tables + self.output_table = output_table class LineageLogger(ABC): @@ -11,9 +77,7 @@ class LineageLogger(ABC): functionality afforded by vdk-trino. """ - def send( - self, lineage_data: Dict[str, Any] - ) -> None: # TODO: implement a common lineage data standard + def send(self, lineage_data: LineageData) -> None: """ This method sends the collected lineage data to some lineage data processing application. diff --git a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage_utils.py b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage_utils.py new file mode 100644 index 0000000000..4811a6da4f --- /dev/null +++ b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage_utils.py @@ -0,0 +1,130 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from typing import List + +from vdk.plugin.trino.lineage import LineageData +from vdk.plugin.trino.lineage import LineageTable + + +def is_heartbeat_query(query: str): + import sqlparse + from sqlparse.tokens import Whitespace + + formatted_query = sqlparse.format(query, reindent=True, keyword_case="upper") + statement = sqlparse.parse(formatted_query)[0] + tokens = statement.tokens + return ( + len(tokens) == 3 + and tokens[0].value == "SELECT" + and tokens[1].ttype == Whitespace + and tokens[2].ttype[0] == "Literal" + ) + + +def get_rename_table_lineage_from_query( + query: str, schema: str, catalog: str +) -> LineageData: + """ + This method parses the sql query. If and only if it is a rename table query, + the method returns the names of the source and destination table. + :param query: The SQL query potentially containing a RENAME TABLE operation + :param schema: The schema which is queried + :param catalog: The catalog which is queried + :return: A tuple with (table_from, table_to) if it is a RENAME TABLE query, None otherwise. + """ + import sqlparse + from sqlparse.tokens import Keyword + + formatted_query = sqlparse.format(query, reindent=True, keyword_case="upper") + statement = sqlparse.parse(formatted_query)[0] + keywords = filter( + lambda token: token.ttype in [Keyword, Keyword.DDL], statement.tokens + ) + keyword_values = list(map(lambda token: token.value, keywords)) + + if keyword_values == ["ALTER", "TABLE", "RENAME", "TO"]: + table_from = _lineage_table_from_name( + statement.tokens[4].value, schema, catalog + ) + table_to = _lineage_table_from_name(statement.tokens[10].value, schema, catalog) + elif keyword_values == ["ALTER", "TABLE", "IF", "EXISTS", "RENAME", "TO"]: + table_from = _lineage_table_from_name( + statement.tokens[8].value, schema, catalog + ) + table_to = _lineage_table_from_name(statement.tokens[14].value, schema, catalog) + else: + return None + return LineageData( + query=query, + query_type="rename_table", + query_status="OK", + input_tables=[table_from], + output_table=table_to, + ) + + +def _lineage_table_from_name( + table_name: str, schema: str, catalog: str +) -> LineageTable: + tokens = table_name.split(".") + if len(tokens) == 1: + return LineageTable(catalog, schema, tokens[0]) + elif len(tokens) == 2: + return LineageTable(catalog, tokens[0], tokens[1]) + elif len(tokens) == 3: + return LineageTable(tokens[0], tokens[1], tokens[2]) + else: + return None + + +def get_lineage_data_from_io_explain( + query: str, lineage_result_json_string: str +) -> LineageData: + """ + Constructs a LineageData object from the result of a Trino query plan + + :param query: + The query the lineage data will be created for + :param lineage_result_json_string: + The exact result of 'EXPLAIN (TYPE IO, FORMAT JSON)'. + It is a str representation of the JSON response. + """ + input_tables = None + output_table = None + import json + + data = json.loads(lineage_result_json_string) + if "inputTableColumnInfos" in data and len(data["inputTableColumnInfos"]) > 0: + input_tables = _get_input_tables_from_explain(data["inputTableColumnInfos"]) + if "outputTable" in data and "schemaTable" in data["outputTable"]: + output_table = _get_lineage_table_from_plan(data["outputTable"]) + if input_tables and output_table: + query_type = "insert_select" + elif input_tables: + query_type = "select" + elif output_table: + query_type = "insert" + else: + query_type = "undefined" + + return LineageData( + query=query, + query_type=query_type, + query_status="OK", + input_tables=input_tables, + output_table=output_table, + ) + + +def _get_lineage_table_from_plan(table_dict: dict) -> LineageTable: + return LineageTable( + catalog=table_dict["catalog"], + schema=table_dict["schemaTable"]["schema"], + table=table_dict["schemaTable"]["table"], + ) + + +def _get_input_tables_from_explain(input_tables: list) -> List[LineageTable]: + return list( + map(_get_lineage_table_from_plan, map(lambda t: t["table"], input_tables)) + ) diff --git a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/trino_connection.py b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/trino_connection.py index 22d0f40d9c..4b5bebef03 100644 --- a/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/trino_connection.py +++ b/projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/trino_connection.py @@ -1,7 +1,6 @@ # Copyright 2021 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 import logging -import time from tenacity import before_sleep_log from tenacity import retry @@ -84,19 +83,13 @@ def _connect(self): return conn def execute_query(self, query): - query_id = str(time.time()) - lineage_data = self._get_lineage_data(query, query_id) - try: - res = self.execute_query_with_retries(query) - self._send_query_telemetry(query, query_id) - self.add_num_rows_after_query(lineage_data, res) - return res - except Exception as e: - self._send_query_telemetry(query, query_id, e) - raise - finally: - if self._lineage_logger and lineage_data: + res = self.execute_query_with_retries(query) + if self._lineage_logger: + lineage_data = self._get_lineage_data(query) + if lineage_data: self._lineage_logger.send(lineage_data) + # TODO: collect lineage for failed query + return res @retry( stop=stop_after_attempt(5), @@ -109,72 +102,46 @@ def execute_query_with_retries(self, query): res = super().execute_query(query) return res - def add_num_rows_after_query(self, lineage_data, res): - if lineage_data: - lineage_data["output_num_rows_after"] = self._get_output_table_num_rows( - lineage_data + def _get_lineage_data(self, query): + + from vdk.plugin.trino import lineage_utils + import sqlparse + + statement = sqlparse.parse(query)[0] + + if statement.get_type() == "ALTER": + rename_table_lineage = lineage_utils.get_rename_table_lineage_from_query( + query, self._schema, self._catalog ) - if "outputTable" in lineage_data: - if res and res[0] and res[0][0]: - lineage_data["output_num_rows_updated"] = res[0][0] + if rename_table_lineage: + log.debug("Collecting lineage for rename table operation ...") + return rename_table_lineage + else: + log.debug( + "ALTER operation not a RENAME TABLE operation. No lineage will be collected." + ) - def _get_lineage_data(self, query, query_id): - if self._lineage_logger: + elif statement.get_type() == "SELECT" or statement.get_type() == "INSERT": + if lineage_utils.is_heartbeat_query(query): + return None + log.debug("Collecting lineage for SELECT/INSERT query ...") try: - with closing_noexcept_on_close(self.__cursor()) as cur: + with closing_noexcept_on_close(self._cursor()) as cur: cur.execute(f"EXPLAIN (TYPE IO, FORMAT JSON) {query}") result = cur.fetchall() if result: - import json - - data = json.loads(result[0][0]) - data["@type"] = "taurus_query_io" - data["query"] = query - data["@id"] = query_id - data[ - "output_num_rows_before" - ] = self._get_output_table_num_rows(data) - return data + return lineage_utils.get_lineage_data_from_io_explain( + query, result[0][0] + ) except Exception as e: log.info( f"Failed to get query io details for telemetry: {e}. Will continue with query execution" ) return None + else: + log.debug( + "Unsupported query type for lineage collection. Will not collect lineage." + ) + return None - def _get_output_table_num_rows(self, data): - if data and "outputTable" in data: - try: - outputTable = data["outputTable"] - catalog = outputTable["catalog"] - schema = outputTable["schemaTable"]["schema"] - table = outputTable["schemaTable"]["table"] - # TODO: escape potential reserved names - with closing_noexcept_on_close(self.__cursor()) as cur: - cur.execute( # nosec - f"select count(1) from {catalog}.{schema}.{table}" - ) - res = cur.fetchall() - if res: - return res[0][0] - else: - return None - except Exception as e: - log.info( - f"Failed to get output table details: {e}. Will continue with query processing as normal" - ) - return None return None - - def _send_query_telemetry(self, query, query_id, exception=None): - if self._lineage_logger: - try: - data = dict() - data["@type"] = "taurus_query" - data["query"] = query - data["@id"] = query_id - data["status"] = "OK" if exception is None else "EXCEPTION" - if exception: - data["error_message"] = str(exception) - self._lineage_logger.send(data) - except: - log.exception("Failed to send query details as telemetry.") diff --git a/projects/vdk-plugins/vdk-trino/tests/test_vdk_trino_lineage.py b/projects/vdk-plugins/vdk-trino/tests/test_vdk_trino_lineage.py index 28a0d1054f..c6a547694c 100644 --- a/projects/vdk-plugins/vdk-trino/tests/test_vdk_trino_lineage.py +++ b/projects/vdk-plugins/vdk-trino/tests/test_vdk_trino_lineage.py @@ -1,14 +1,16 @@ # Copyright 2021 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 import os -import re +import uuid from unittest import mock import pytest from click.testing import Result from vdk.api.plugin.hook_markers import hookimpl +from vdk.plugin.test_utils.util_funcs import cli_assert_equal from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner from vdk.plugin.trino import trino_plugin +from vdk.plugin.trino.lineage import LineageData from vdk.plugin.trino.lineage import LineageLogger from vdk.plugin.trino.trino_plugin import LINEAGE_LOGGER_KEY @@ -17,6 +19,75 @@ VDK_TRINO_USE_SSL = "VDK_TRINO_USE_SSL" +class TestConfigPlugin: + def __init__(self, lineage_logger: LineageLogger): + self.lineage_logger = lineage_logger + + @hookimpl + def vdk_initialize(self, context): + context.state.set(LINEAGE_LOGGER_KEY, self.lineage_logger) + + +def execute_query(runner, query: str): + result: Result = runner.invoke(["trino-query", "--query", query]) + cli_assert_equal(0, result) + + +@pytest.mark.usefixtures("trino_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + }, +) +def test_lineage_not_collected_for_heartbeat_query(): + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) + execute_query(runner, "select 1") + assert not mock_lineage_logger.send.called + + +@pytest.mark.usefixtures("trino_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + }, +) +def test_lineage_for_insert(): + table_name = "test_table_" + uuid.uuid4().hex + + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) + + execute_query(runner, f"create table {table_name} (test_column varchar)") + + insert_query = f"insert into {table_name} values('test value')" + execute_query(runner, insert_query) + + # the lineage data is different on every run of the test, + # so we need this class to generalize the dict which is to be matched + class InsertLineageDataMatcher: + def __eq__(self, lineage_data: LineageData): + assert lineage_data.query == insert_query + assert lineage_data.query_type == "insert" + assert lineage_data.query_status == "OK" + assert lineage_data.output_table.catalog == "memory" + assert lineage_data.output_table.schema == "default" + assert lineage_data.output_table.table == table_name + return True + + mock_lineage_logger.send.assert_called_with(InsertLineageDataMatcher()) + + @pytest.mark.usefixtures("trino_service") @mock.patch.dict( os.environ, @@ -26,27 +97,263 @@ VDK_TRINO_USE_SSL: "False", }, ) -def test_lineage(): - mock_lineage = mock.MagicMock(LineageLogger) +def test_lineage_for_select(): - class TestConfigPlugin: - @hookimpl - def vdk_initialize(self, context): - context.state.set(LINEAGE_LOGGER_KEY, mock_lineage) + table_name = "test_table_" + uuid.uuid4().hex + + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) + + execute_query(runner, f"create table {table_name} (test_column varchar)") + + select_query = f"select * from {table_name}" + execute_query(runner, select_query) + + # the lineage data is different on every run of the test, + # so we need this class to generalize the dict which is to be matched + class SelectLineageDataMatcher: + def __eq__(self, lineage_data: LineageData): + assert lineage_data.query == select_query + assert lineage_data.query_type == "select" + assert lineage_data.query_status == "OK" + assert len(lineage_data.input_tables) == 1 + assert lineage_data.input_tables[0].catalog == "memory" + assert lineage_data.input_tables[0].schema == "default" + assert lineage_data.input_tables[0].table == table_name + return True + + mock_lineage_logger.send.assert_called_with(SelectLineageDataMatcher()) + + +@pytest.mark.usefixtures("trino_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + }, +) +def test_lineage_for_insert_select(): + table_name_source = "test_tbl_src_" + uuid.uuid4().hex + table_name_dest = "test_tbl_dst_" + uuid.uuid4().hex + + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) + + execute_query(runner, f"create table {table_name_source} (test_column varchar)") + execute_query(runner, f"create table {table_name_dest} (test_column varchar)") + + insert_select_query = ( + f"insert into {table_name_dest} select * from {table_name_source}" + ) + execute_query(runner, insert_select_query) + + # the lineage data is different on every run of the test, + # so we need this class to generalize the dict which is to be matched + class InsertSelectLineageDataMatch: + def __eq__(self, lineage_data: LineageData): + assert lineage_data.query == insert_select_query + assert lineage_data.query_type == "insert_select" + assert lineage_data.query_status == "OK" + assert lineage_data.input_tables[0].table == table_name_source + assert lineage_data.output_table.table == table_name_dest + return True + + mock_lineage_logger.send.assert_called_with(InsertSelectLineageDataMatch()) + + +@pytest.mark.usefixtures("trino_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + }, +) +def test_lineage_for_insert_select_full_names(): + test_schema = "memory.test_schema" + table_name_source = "test_tbl_src_" + uuid.uuid4().hex + table_name_dest = "test_tbl_dst_" + uuid.uuid4().hex - runner = CliEntryBasedTestRunner(TestConfigPlugin(), trino_plugin) + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) + + execute_query(runner, f"create schema if not exists {test_schema}") + execute_query( + runner, f"create table {test_schema}.{table_name_source} (test_column varchar)" + ) + execute_query( + runner, f"create table {test_schema}.{table_name_dest} (test_column varchar)" + ) + + insert_select_query = f"insert into {test_schema}.{table_name_dest} select * from {test_schema}.{table_name_source}" + execute_query(runner, insert_select_query) + + # the lineage data is different on every run of the test, + # so we need this class to generalize the dict which is to be matched + class InsertSelectLineageDataMatch: + def __eq__(self, lineage_data: LineageData): + assert lineage_data.query == insert_select_query + assert lineage_data.query_type == "insert_select" + assert lineage_data.query_status == "OK" + assert lineage_data.input_tables[0].catalog == "memory" + assert lineage_data.input_tables[0].schema == "test_schema" + assert lineage_data.input_tables[0].table == table_name_source + assert lineage_data.output_table.catalog == "memory" + assert lineage_data.output_table.schema == "test_schema" + assert lineage_data.output_table.table == table_name_dest + return True + + mock_lineage_logger.send.assert_called_with(InsertSelectLineageDataMatch()) + + +@pytest.mark.usefixtures("trino_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + }, +) +def test_lineage_for_rename_table(): + table_name_from = "test_table_" + uuid.uuid4().hex + table_name_to = "test_table_" + uuid.uuid4().hex + + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) + + execute_query(runner, f"create table {table_name_from} (test_column varchar)") + + rename_query = f"alter table {table_name_from} rename to {table_name_to}" + execute_query(runner, rename_query) + + # the lineage data is different on every run of the test, + # so we need this class to generalize the dict which is to be matched + class RenameTableLineageDataMatch: + def __eq__(self, lineage_data: LineageData): + assert lineage_data.query == rename_query + assert lineage_data.query_type == "rename_table" + assert lineage_data.query_status == "OK" + assert lineage_data.input_tables[0].table == table_name_from + assert lineage_data.output_table.table == table_name_to + return True + + mock_lineage_logger.send.assert_called_with(RenameTableLineageDataMatch()) + + +@pytest.mark.usefixtures("trino_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + }, +) +def test_lineage_for_rename_table_full_names(): + test_schema = "memory.test_schema" + table_name_from = "test_table_" + uuid.uuid4().hex + table_name_to = "test_table_" + uuid.uuid4().hex + + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) + + execute_query(runner, f"create schema if not exists {test_schema}") + execute_query( + runner, f"create table {test_schema}.{table_name_from} (test_column varchar)" + ) + + rename_query = f"alter table {test_schema}.{table_name_from} rename to {test_schema}.{table_name_to}" + execute_query(runner, rename_query) + + # the lineage data is different on every run of the test, + # so we need this class to generalize the dict which is to be matched + class RenameTableLineageDataMatch: + def __eq__(self, lineage_data: LineageData): + assert lineage_data.query == rename_query + assert lineage_data.query_type == "rename_table" + assert lineage_data.query_status == "OK" + assert lineage_data.input_tables[0].catalog == "memory" + assert lineage_data.input_tables[0].schema == "test_schema" + assert lineage_data.input_tables[0].table == table_name_from + assert lineage_data.output_table.catalog == "memory" + assert lineage_data.output_table.schema == "test_schema" + assert lineage_data.output_table.table == table_name_to + return True + + mock_lineage_logger.send.assert_called_with(RenameTableLineageDataMatch()) + + +@pytest.mark.usefixtures("trino_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + }, +) +def test_lineage_for_rename_table_if_exists(): + table_name_from = "test_table_" + uuid.uuid4().hex + table_name_to = "test_table_" + uuid.uuid4().hex + + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) + + execute_query(runner, f"create table {table_name_from} (test_column varchar)") + + rename_query = ( + f"alter table if exists {table_name_from} rename to {table_name_to}" + ) + execute_query(runner, rename_query) + + # the lineage data is different on every run of the test, + # so we need this class to generalize the dict which is to be matched + class RenameTableLineageDataMatch: + def __eq__(self, lineage_data: LineageData): + assert lineage_data.query == rename_query + assert lineage_data.query_type == "rename_table" + assert lineage_data.query_status == "OK" + assert lineage_data.input_tables[0].table == table_name_from + assert lineage_data.output_table.table == table_name_to + return True + + mock_lineage_logger.send.assert_called_with(RenameTableLineageDataMatch()) + + +@pytest.mark.usefixtures("trino_service") +@mock.patch.dict( + os.environ, + { + VDK_DB_DEFAULT_TYPE: "TRINO", + VDK_TRINO_PORT: "8080", + VDK_TRINO_USE_SSL: "False", + }, +) +def test_lineage_not_collected_for_some_queries(): + table_name = "test_table_" + uuid.uuid4().hex - result: Result = runner.invoke(["trino-query", "--query", "SELECT 1"]) + mock_lineage_logger = mock.MagicMock(LineageLogger) + runner = CliEntryBasedTestRunner( + TestConfigPlugin(mock_lineage_logger), trino_plugin + ) - class LineageDataMatch: # the lineage data is different on every run of the test, - # so we need this class to generalize the dict which is to be matched - def __eq__(self, lineage_data): - return ( - lineage_data.keys() == {"@type", "query", "@id", "status"} - and lineage_data["@type"] == "taurus_query" - and lineage_data["status"] == "OK" - and lineage_data["query"] == "SELECT 1" - and re.fullmatch(r"[0-9]{10}\.[0-9]+", lineage_data["@id"]) - ) + execute_query(runner, f"create table {table_name} (test_column varchar)") - mock_lineage.send.assert_called_with(LineageDataMatch()) + execute_query(runner, f"describe {table_name}") + assert not mock_lineage_logger.send.called diff --git a/projects/vdk-plugins/vdk-trino/tests/test_vdk_trino_lineage_utils.py b/projects/vdk-plugins/vdk-trino/tests/test_vdk_trino_lineage_utils.py new file mode 100644 index 0000000000..ed5dc2a533 --- /dev/null +++ b/projects/vdk-plugins/vdk-trino/tests/test_vdk_trino_lineage_utils.py @@ -0,0 +1,208 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import json + +from vdk.plugin.trino.lineage_utils import _get_input_tables_from_explain +from vdk.plugin.trino.lineage_utils import _get_lineage_table_from_plan +from vdk.plugin.trino.lineage_utils import _lineage_table_from_name +from vdk.plugin.trino.lineage_utils import get_rename_table_lineage_from_query +from vdk.plugin.trino.lineage_utils import is_heartbeat_query + + +def test_is_heartbeat_query(): + assert is_heartbeat_query("select 1") + assert is_heartbeat_query("select 'aaa'") + assert not is_heartbeat_query("select * from a_table") + + +def test_lineage_table_from_name(): + lineage_table = _lineage_table_from_name( + table_name="test_table", schema="default_schema", catalog="default_catalog" + ) + assert lineage_table.catalog == "default_catalog" + assert lineage_table.schema == "default_schema" + assert lineage_table.table == "test_table" + + +def test_lineage_table_from_name_and_schema(): + lineage_table = _lineage_table_from_name( + table_name="test_schema.test_table", + schema="default_schema", + catalog="default_catalog", + ) + assert lineage_table.catalog == "default_catalog" + assert lineage_table.schema == "test_schema" + assert lineage_table.table == "test_table" + + +def test_lineage_table_from_name_and_schema_and_catalog(): + lineage_table = _lineage_table_from_name( + table_name="test_catalog.test_schema.test_table", + schema="default_schema", + catalog="default_catalog", + ) + assert lineage_table.catalog == "test_catalog" + assert lineage_table.schema == "test_schema" + assert lineage_table.table == "test_table" + + +def test_get_lineage_table_from_plan(): + table_dict = json.loads( + """ + { + "catalog": "test_catalog", + "schemaTable": { + "schema": "test_schema", + "table": "test_table" + } + } + """ + ) + lineage_table = _get_lineage_table_from_plan(table_dict) + assert lineage_table.catalog == "test_catalog" + assert lineage_table.schema == "test_schema" + assert lineage_table.table == "test_table" + + +def test_get_input_tables_from_explain(): + explain_io_json = """ + { + "inputTableColumnInfos" : [ { + "table" : { + "catalog" : "hive", + "schemaTable" : { + "schema" : "history", + "table" : "palexiev2" + } + }, + "columnConstraints" : [ ], + "estimate" : { + "outputRowCount" : 0.0, + "outputSizeInBytes" : 0.0, + "cpuCost" : 0.0, + "maxMemory" : 0.0, + "networkCost" : 0.0 + } + }, { + "table" : { + "catalog" : "hive", + "schemaTable" : { + "schema" : "history", + "table" : "palexiev" + } + }, + "columnConstraints" : [ ], + "estimate" : { + "outputRowCount" : 0.0, + "outputSizeInBytes" : 0.0, + "cpuCost" : 0.0, + "maxMemory" : 0.0, + "networkCost" : 0.0 + } + } ], + "estimate" : { + "outputRowCount" : 0.0, + "outputSizeInBytes" : 0.0, + "cpuCost" : 0.0, + "maxMemory" : 0.0, + "networkCost" : 0.0 + } + } + """ + explain_dict = json.loads(explain_io_json) + lineage_tables = _get_input_tables_from_explain( + explain_dict["inputTableColumnInfos"] + ) + + table1 = lineage_tables[0] + assert table1.catalog == "hive" + assert table1.schema == "history" + assert table1.table == "palexiev2" + + table2 = lineage_tables[1] + assert table2.catalog == "hive" + assert table2.schema == "history" + assert table2.table == "palexiev" + + +def test_get_rename_table_lineage_from_query(): + query = "alter table tbl_from rename to tbl_to" + lineage_data = get_rename_table_lineage_from_query( + query, "test_schema", "test_catalog" + ) + assert lineage_data is not None + assert lineage_data.query == query + assert lineage_data.query_type == "rename_table" + assert lineage_data.query_status == "OK" + assert lineage_data.input_tables is not None + assert len(lineage_data.input_tables) == 1 + assert lineage_data.input_tables[0].table == "tbl_from" + assert lineage_data.input_tables[0].schema == "test_schema" + assert lineage_data.input_tables[0].catalog == "test_catalog" + assert lineage_data.output_table is not None + assert lineage_data.output_table.table == "tbl_to" + assert lineage_data.output_table.schema == "test_schema" + assert lineage_data.output_table.catalog == "test_catalog" + + +def test_get_rename_table_lineage_from_query_with_schema(): + query = "alter table test_schema.tbl_from rename to test_schema.tbl_to" + lineage_data = get_rename_table_lineage_from_query( + query, "wrong_schema", "test_catalog" + ) + assert lineage_data is not None + assert lineage_data.query == query + assert lineage_data.query_type == "rename_table" + assert lineage_data.query_status == "OK" + assert lineage_data.input_tables is not None + assert len(lineage_data.input_tables) == 1 + assert lineage_data.input_tables[0].table == "tbl_from" + assert lineage_data.input_tables[0].schema == "test_schema" + assert lineage_data.input_tables[0].catalog == "test_catalog" + assert lineage_data.output_table is not None + assert lineage_data.output_table.table == "tbl_to" + assert lineage_data.output_table.schema == "test_schema" + assert lineage_data.output_table.catalog == "test_catalog" + + +def test_get_rename_table_lineage_from_query_full_names(): + query = "alter table test_catalog.test_schema.tbl_from rename to test_catalog.test_schema.tbl_to" + lineage_data = get_rename_table_lineage_from_query( + query, "wrong_schema", "wrong_catalog" + ) + assert lineage_data is not None + assert lineage_data.query == query + assert lineage_data.query_type == "rename_table" + assert lineage_data.query_status == "OK" + assert lineage_data.input_tables is not None + assert len(lineage_data.input_tables) == 1 + assert lineage_data.input_tables[0].table == "tbl_from" + assert lineage_data.input_tables[0].schema == "test_schema" + assert lineage_data.input_tables[0].catalog == "test_catalog" + assert lineage_data.output_table is not None + assert lineage_data.output_table.table == "tbl_to" + assert lineage_data.output_table.schema == "test_schema" + assert lineage_data.output_table.catalog == "test_catalog" + + +def test_get_rename_table_lineage_from_query_false_cases(): + assert ( + get_rename_table_lineage_from_query( + "alter table tbl1 add column col1 int", "test_schema", "test_catalog" + ) + is None + ) + assert ( + get_rename_table_lineage_from_query( + "alter table tbl1 rename column col1 to col2", + "test_schema", + "test_catalog", + ) + is None + ) + assert ( + get_rename_table_lineage_from_query( + "alter view view1 rename to view2", "test_schema", "test_catalog" + ) + is None + )