Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Lineage handle copy queries being skipped #14855

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions ingestion/src/metadata/ingestion/lineage/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,30 @@ def intermediate_tables(self) -> List[Table]:
"""
Get a list of intermediate tables
"""
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return self.retrieve_tables(self.parser.intermediate_tables)
if self.parser:
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return self.retrieve_tables(self.parser.intermediate_tables)
return []

@cached_property
def source_tables(self) -> List[Table]:
"""
Get a list of source tables
"""
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return self.retrieve_tables(self.parser.source_tables)
if self.parser:
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return self.retrieve_tables(self.parser.source_tables)
return []

@cached_property
def target_tables(self) -> List[Table]:
"""
Get a list of target tables
"""
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return self.retrieve_tables(self.parser.target_tables)
if self.parser:
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return self.retrieve_tables(self.parser.target_tables)
return []

# pylint: disable=protected-access
@cached_property
Expand All @@ -124,6 +130,8 @@ def column_lineage(self) -> List[Tuple[Column, Column]]:
Get a list of tuples of column lineage
"""
column_lineage = []
if self.parser is None:
return []
try:
if self.parser._dialect == SQLPARSE_DIALECT:
return self.parser.get_column_lineage()
Expand Down Expand Up @@ -331,6 +339,8 @@ def table_joins(self) -> Dict[str, List[TableColumnJoin]]:
:return: for each table name, list all joins against other tables
"""
join_data = defaultdict(list)
if self.parser is None:
return join_data
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
for statement in self.parser.statements():
self.stateful_add_joins_from_statement(join_data, sql_statement=statement)
Expand Down Expand Up @@ -378,7 +388,11 @@ def clean_raw_query(cls, raw_query: str) -> Optional[str]:
@staticmethod
def _evaluate_best_parser(
query: str, dialect: Dialect, timeout_seconds: int
) -> LineageRunner:
) -> Optional[LineageRunner]:

if query is None:
return None

@timeout(seconds=timeout_seconds)
def get_sqlfluff_lineage_runner(qry: str, dlct: str) -> LineageRunner:
lr_dialect = LineageRunner(qry, dialect=dlct)
Expand Down
35 changes: 31 additions & 4 deletions ingestion/tests/unit/test_query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def test_ctes_column_lineage(self):
"""
Validate we obtain information from Comon Table Expressions
"""
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
WITH cte_table AS (
SELECT
USERS.ID,
Expand All @@ -226,7 +226,6 @@ def test_ctes_column_lineage(self):
;
"""

expected_tables = {"testdb.public.users"}
expected_lineage = [
(
Column("testdb.public.users.id"),
Expand Down Expand Up @@ -258,7 +257,7 @@ def test_table_with_single_comment(self):
"""
Validate we obtain information from Comon Table Expressions
"""
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
SELECT
ID,
-- A comment here
Expand Down Expand Up @@ -295,7 +294,7 @@ def test_table_with_aliases(self):
"""
Validate we obtain information from Comon Table Expressions
"""
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
SELECT
ID AS new_identifier,
NAME new_name
Expand Down Expand Up @@ -329,3 +328,31 @@ def test_table_with_aliases(self):
parser.column_lineage,
expected_lineage,
)

def test_copy_query(self):
"""
Validate Copy query is skipped appropriately without any errors
"""
query = """COPY MY_TABLE col1,col2,col3
FROM 's3://bucket/schema/table.csv'
WITH CREDENTIALS ''
REGION 'US-east-2'
"""
expected_lineage = []
expected_tables = set()

parser = LineageParser(query)
tables = {str(table) for table in parser.involved_tables}
self.assertEqual(tables, expected_tables)
self.assertEqual(
parser.column_lineage,
expected_lineage,
)

parser = LineageParser(query, Dialect.MYSQL)
tables = {str(table) for table in parser.involved_tables}
self.assertEqual(tables, expected_tables)
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
Loading