Skip to content

Commit

Permalink
vdk-trino: collect lineage for select/insert and rename table only (#756
Browse files Browse the repository at this point in the history
)

* vdk-trino: collect lineage for select/insert and rename table only

Why:
To make lineage collecting more production ready,
some improvements are needed.

What:
In order to reduce the load on the query engine,
   only plans for insert/select queries are calculated.
For rename table queries, the plan doesn't give information.
   The query is parsed and table names extracted.
Counting the number of rows in the output table before and after
   is removed to reduce the burden on the query engine.

How has this been tested:
Tweaked the test_vdk_trino_lineage.py test
  to be more comprehensive and cover all scenarios.

What type of change are you making?
Bug fix (non-breaking change which fixes an issue)
  or a cosmetic change/minor improvement

Signed-off-by: Philip Alexiev ([email protected])

* vdk-trino: collect lineage for select/insert and rename table only

Why:
To make lineage collecting more production ready,
some improvements are needed.

What:
In order to reduce the load on the query engine,
   only plans for insert/select queries are calculated.
For rename table queries, the plan doesn't give information.
   The query is parsed and table names extracted.
Counting the number of rows in the output table before and after
   is removed to reduce the burden on the query engine.

How has this been tested:
Tweaked the test_vdk_trino_lineage.py test
  to be more comprehensive and cover all scenarios.

What type of change are you making?
Bug fix (non-breaking change which fixes an issue)
  or a cosmetic change/minor improvement

Signed-off-by: Philip Alexiev ([email protected])

* vdk-trino: collect lineage for select/insert and rename table only

Why:
To make lineage collecting more production ready,
some improvements are needed.

What:
In order to reduce the load on the query engine,
   only plans for insert/select queries are calculated.
For rename table queries, the plan doesn't give information.
   The query is parsed and table names extracted.
Counting the number of rows in the output table before and after
   is removed to reduce the burden on the query engine.

How has this been tested:
Tweaked the test_vdk_trino_lineage.py test
  to be more comprehensive and cover all scenarios.

What type of change are you making?
Bug fix (non-breaking change which fixes an issue)
  or a cosmetic change/minor improvement

Signed-off-by: Philip Alexiev ([email protected])

* vdk-trino: collect lineage for select/insert and rename table only

Why:
To make lineage collecting more production ready,
some improvements are needed.

What:
In order to reduce the load on the query engine,
   only plans for insert/select queries are calculated.
For rename table queries, the plan doesn't give information.
   The query is parsed and table names extracted.
Counting the number of rows in the output table before and after
   is removed to reduce the burden on the query engine.

How has this been tested:
Tweaked the test_vdk_trino_lineage.py test
  to be more comprehensive and cover all scenarios.

What type of change are you making?
Bug fix (non-breaking change which fixes an issue)
  or a cosmetic change/minor improvement

Signed-off-by: Philip Alexiev ([email protected])

* vdk-trino: collect lineage for select/insert and rename table only

Why:
To make lineage collecting more production ready,
some improvements are needed.

What:
In order to reduce the load on the query engine,
   only plans for insert/select queries are calculated.
For rename table queries, the plan doesn't give information.
   The query is parsed and table names extracted.
Counting the number of rows in the output table before and after
   is removed to reduce the burden on the query engine.

How has this been tested:
Tweaked the test_vdk_trino_lineage.py test
  to be more comprehensive and cover all scenarios.

What type of change are you making?
Bug fix (non-breaking change which fixes an issue)
  or a cosmetic change/minor improvement

Signed-off-by: Philip Alexiev ([email protected])

* vdk-trino: collect lineage for select/insert and rename table only

Why:
To make lineage collecting more production ready,
some improvements are needed.

What:
In order to reduce the load on the query engine,
   only plans for insert/select queries are calculated.
For rename table queries, the plan doesn't give information.
   The query is parsed and table names extracted.
Counting the number of rows in the output table before and after
   is removed to reduce the burden on the query engine.

How has this been tested:
Tweaked the test_vdk_trino_lineage.py test
  to be more comprehensive and cover all scenarios.

What type of change are you making?
Bug fix (non-breaking change which fixes an issue)
  or a cosmetic change/minor improvement

Signed-off-by: Philip Alexiev ([email protected])

* vdk-trino: collect lineage for select/insert and rename table only

Why:
To make lineage collecting more production ready,
some improvements are needed.

What:
In order to reduce the load on the query engine,
   only plans for insert/select queries are calculated.
For rename table queries, the plan doesn't give information.
   The query is parsed and table names extracted.
Counting the number of rows in the output table before and after
   is removed to reduce the burden on the query engine.

How has this been tested:
Tweaked the test_vdk_trino_lineage.py test
  to be more comprehensive and cover all scenarios.

What type of change are you making?
Bug fix (non-breaking change which fixes an issue)
  or a cosmetic change/minor improvement

Signed-off-by: Philip Alexiev ([email protected])

* vdk-trino: collect lineage for select/insert and rename table only

Why:
To make lineage collecting more production ready,
some improvements are needed.

What:
In order to reduce the load on the query engine,
   only plans for insert/select queries are calculated.
For rename table queries, the plan doesn't give information.
   The query is parsed and table names extracted.
Counting the number of rows in the output table before and after
   is removed to reduce the burden on the query engine.

How has this been tested:
Tweaked the test_vdk_trino_lineage.py test
  to be more comprehensive and cover all scenarios.

What type of change are you making?
Bug fix (non-breaking change which fixes an issue)
  or a cosmetic change/minor improvement

Signed-off-by: Philip Alexiev ([email protected])
  • Loading branch information
philip-alexiev authored Mar 15, 2022
1 parent 105a89c commit 15a119c
Show file tree
Hide file tree
Showing 6 changed files with 771 additions and 95 deletions.
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-trino/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
description="Versatile Data Kit SDK plugin provides support for trino database and trino transformation templates.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core", "trino", "tabulate"],
install_requires=["vdk-core", "trino", "tabulate", "sqlparse"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
include_package_data=True,
Expand Down
74 changes: 69 additions & 5 deletions projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,74 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from abc import ABC
from typing import Any
from typing import Dict
from typing import List


class LineageTable:
"""
Defines the LineageTable object contract
Attributes
catalog: str
A string holding the catalog where the table lives
schema: str
A string holding the schema where the table lives
table: str
A string holding the table name
"""

catalog: str
schema: str
table: str

def __init__(self, catalog: str, schema: str, table: str):
self.catalog = catalog
self.schema = schema
self.table = table


class LineageData:
"""
Defines the LineageData contract
Attributes
query: str
The original query
query_type: str
Type of operation (see below for supported values)
query_status: str
'OK' or 'EXCEPTION'
input_tables: List[LineageTable]
An array of LineageTable objects (see LineageTable)
output_table: LineageTable
A LineageTable object (see LineageTable)
Supported query_type values are:
- insert
- select
- insert_select
- rename_table
"""

query: str
query_type: str
query_status: str
input_tables: List[LineageTable]
output_table: LineageTable

def __init__(
self,
query: str,
query_type: str,
query_status: str,
input_tables: List[LineageTable],
output_table: LineageTable,
):
self.query = query
self.query_type = query_type
self.query_status = query_status
self.input_tables = input_tables
self.output_table = output_table


class LineageLogger(ABC):
Expand All @@ -11,9 +77,7 @@ class LineageLogger(ABC):
functionality afforded by vdk-trino.
"""

def send(
self, lineage_data: Dict[str, Any]
) -> None: # TODO: implement a common lineage data standard
def send(self, lineage_data: LineageData) -> None:
"""
This method sends the collected lineage data to some lineage data processing application.
Expand Down
130 changes: 130 additions & 0 deletions projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/lineage_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from typing import List

from vdk.plugin.trino.lineage import LineageData
from vdk.plugin.trino.lineage import LineageTable


def is_heartbeat_query(query: str):
import sqlparse
from sqlparse.tokens import Whitespace

formatted_query = sqlparse.format(query, reindent=True, keyword_case="upper")
statement = sqlparse.parse(formatted_query)[0]
tokens = statement.tokens
return (
len(tokens) == 3
and tokens[0].value == "SELECT"
and tokens[1].ttype == Whitespace
and tokens[2].ttype[0] == "Literal"
)


def get_rename_table_lineage_from_query(
query: str, schema: str, catalog: str
) -> LineageData:
"""
This method parses the sql query. If and only if it is a rename table query,
the method returns the names of the source and destination table.
:param query: The SQL query potentially containing a RENAME TABLE operation
:param schema: The schema which is queried
:param catalog: The catalog which is queried
:return: A tuple with (table_from, table_to) if it is a RENAME TABLE query, None otherwise.
"""
import sqlparse
from sqlparse.tokens import Keyword

formatted_query = sqlparse.format(query, reindent=True, keyword_case="upper")
statement = sqlparse.parse(formatted_query)[0]
keywords = filter(
lambda token: token.ttype in [Keyword, Keyword.DDL], statement.tokens
)
keyword_values = list(map(lambda token: token.value, keywords))

if keyword_values == ["ALTER", "TABLE", "RENAME", "TO"]:
table_from = _lineage_table_from_name(
statement.tokens[4].value, schema, catalog
)
table_to = _lineage_table_from_name(statement.tokens[10].value, schema, catalog)
elif keyword_values == ["ALTER", "TABLE", "IF", "EXISTS", "RENAME", "TO"]:
table_from = _lineage_table_from_name(
statement.tokens[8].value, schema, catalog
)
table_to = _lineage_table_from_name(statement.tokens[14].value, schema, catalog)
else:
return None
return LineageData(
query=query,
query_type="rename_table",
query_status="OK",
input_tables=[table_from],
output_table=table_to,
)


def _lineage_table_from_name(
table_name: str, schema: str, catalog: str
) -> LineageTable:
tokens = table_name.split(".")
if len(tokens) == 1:
return LineageTable(catalog, schema, tokens[0])
elif len(tokens) == 2:
return LineageTable(catalog, tokens[0], tokens[1])
elif len(tokens) == 3:
return LineageTable(tokens[0], tokens[1], tokens[2])
else:
return None


def get_lineage_data_from_io_explain(
query: str, lineage_result_json_string: str
) -> LineageData:
"""
Constructs a LineageData object from the result of a Trino query plan
:param query:
The query the lineage data will be created for
:param lineage_result_json_string:
The exact result of 'EXPLAIN (TYPE IO, FORMAT JSON)'.
It is a str representation of the JSON response.
"""
input_tables = None
output_table = None
import json

data = json.loads(lineage_result_json_string)
if "inputTableColumnInfos" in data and len(data["inputTableColumnInfos"]) > 0:
input_tables = _get_input_tables_from_explain(data["inputTableColumnInfos"])
if "outputTable" in data and "schemaTable" in data["outputTable"]:
output_table = _get_lineage_table_from_plan(data["outputTable"])
if input_tables and output_table:
query_type = "insert_select"
elif input_tables:
query_type = "select"
elif output_table:
query_type = "insert"
else:
query_type = "undefined"

return LineageData(
query=query,
query_type=query_type,
query_status="OK",
input_tables=input_tables,
output_table=output_table,
)


def _get_lineage_table_from_plan(table_dict: dict) -> LineageTable:
return LineageTable(
catalog=table_dict["catalog"],
schema=table_dict["schemaTable"]["schema"],
table=table_dict["schemaTable"]["table"],
)


def _get_input_tables_from_explain(input_tables: list) -> List[LineageTable]:
return list(
map(_get_lineage_table_from_plan, map(lambda t: t["table"], input_tables))
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import time

from tenacity import before_sleep_log
from tenacity import retry
Expand Down Expand Up @@ -84,19 +83,13 @@ def _connect(self):
return conn

def execute_query(self, query):
query_id = str(time.time())
lineage_data = self._get_lineage_data(query, query_id)
try:
res = self.execute_query_with_retries(query)
self._send_query_telemetry(query, query_id)
self.add_num_rows_after_query(lineage_data, res)
return res
except Exception as e:
self._send_query_telemetry(query, query_id, e)
raise
finally:
if self._lineage_logger and lineage_data:
res = self.execute_query_with_retries(query)
if self._lineage_logger:
lineage_data = self._get_lineage_data(query)
if lineage_data:
self._lineage_logger.send(lineage_data)
# TODO: collect lineage for failed query
return res

@retry(
stop=stop_after_attempt(5),
Expand All @@ -109,72 +102,46 @@ def execute_query_with_retries(self, query):
res = super().execute_query(query)
return res

def add_num_rows_after_query(self, lineage_data, res):
if lineage_data:
lineage_data["output_num_rows_after"] = self._get_output_table_num_rows(
lineage_data
def _get_lineage_data(self, query):

from vdk.plugin.trino import lineage_utils
import sqlparse

statement = sqlparse.parse(query)[0]

if statement.get_type() == "ALTER":
rename_table_lineage = lineage_utils.get_rename_table_lineage_from_query(
query, self._schema, self._catalog
)
if "outputTable" in lineage_data:
if res and res[0] and res[0][0]:
lineage_data["output_num_rows_updated"] = res[0][0]
if rename_table_lineage:
log.debug("Collecting lineage for rename table operation ...")
return rename_table_lineage
else:
log.debug(
"ALTER operation not a RENAME TABLE operation. No lineage will be collected."
)

def _get_lineage_data(self, query, query_id):
if self._lineage_logger:
elif statement.get_type() == "SELECT" or statement.get_type() == "INSERT":
if lineage_utils.is_heartbeat_query(query):
return None
log.debug("Collecting lineage for SELECT/INSERT query ...")
try:
with closing_noexcept_on_close(self.__cursor()) as cur:
with closing_noexcept_on_close(self._cursor()) as cur:
cur.execute(f"EXPLAIN (TYPE IO, FORMAT JSON) {query}")
result = cur.fetchall()
if result:
import json

data = json.loads(result[0][0])
data["@type"] = "taurus_query_io"
data["query"] = query
data["@id"] = query_id
data[
"output_num_rows_before"
] = self._get_output_table_num_rows(data)
return data
return lineage_utils.get_lineage_data_from_io_explain(
query, result[0][0]
)
except Exception as e:
log.info(
f"Failed to get query io details for telemetry: {e}. Will continue with query execution"
)
return None
else:
log.debug(
"Unsupported query type for lineage collection. Will not collect lineage."
)
return None

def _get_output_table_num_rows(self, data):
if data and "outputTable" in data:
try:
outputTable = data["outputTable"]
catalog = outputTable["catalog"]
schema = outputTable["schemaTable"]["schema"]
table = outputTable["schemaTable"]["table"]
# TODO: escape potential reserved names
with closing_noexcept_on_close(self.__cursor()) as cur:
cur.execute( # nosec
f"select count(1) from {catalog}.{schema}.{table}"
)
res = cur.fetchall()
if res:
return res[0][0]
else:
return None
except Exception as e:
log.info(
f"Failed to get output table details: {e}. Will continue with query processing as normal"
)
return None
return None

def _send_query_telemetry(self, query, query_id, exception=None):
if self._lineage_logger:
try:
data = dict()
data["@type"] = "taurus_query"
data["query"] = query
data["@id"] = query_id
data["status"] = "OK" if exception is None else "EXCEPTION"
if exception:
data["error_message"] = str(exception)
self._lineage_logger.send(data)
except:
log.exception("Failed to send query details as telemetry.")
Loading

0 comments on commit 15a119c

Please sign in to comment.