Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed table_type for GOVERNED tables #661

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
)
from dbt.adapters.athena.python_submissions import AthenaPythonJobHelper
from dbt.adapters.athena.relation import (
RELATION_TYPE_MAP,
AthenaRelation,
AthenaSchemaSearchMap,
TableType,
Expand Down Expand Up @@ -542,12 +541,13 @@ def _s3_path_exists(self, s3_bucket: str, s3_prefix: str) -> bool:
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
return True if "Contents" in response else False

def _get_one_table_for_catalog(self, table: TableTypeDef, database: str) -> List[Dict[str, Any]]:
@staticmethod
def _get_one_table_for_catalog(table: TableTypeDef, database: str) -> List[Dict[str, Any]]:
table_catalog = {
"table_database": database,
"table_schema": table["DatabaseName"],
"table_name": table["Name"],
"table_type": RELATION_TYPE_MAP[table.get("TableType", "EXTERNAL_TABLE")].value,
"table_type": get_table_type(table).value,
"table_comment": table.get("Parameters", {}).get("comment", table.get("Description", "")),
}
return [
Expand All @@ -563,14 +563,13 @@ def _get_one_table_for_catalog(self, table: TableTypeDef, database: str) -> List
for idx, col in enumerate(table["StorageDescriptor"]["Columns"] + table.get("PartitionKeys", []))
]

def _get_one_table_for_non_glue_catalog(
self, table: TableTypeDef, schema: str, database: str
) -> List[Dict[str, Any]]:
@staticmethod
def _get_one_table_for_non_glue_catalog(table: TableTypeDef, schema: str, database: str) -> List[Dict[str, Any]]:
table_catalog = {
"table_database": database,
"table_schema": schema,
"table_name": table["Name"],
"table_type": RELATION_TYPE_MAP[table.get("TableType", "EXTERNAL_TABLE")].value,
"table_type": get_table_type(table).value,
"table_comment": table.get("Parameters", {}).get("comment", ""),
}
return [
Expand All @@ -583,6 +582,7 @@ def _get_one_table_for_non_glue_catalog(
"column_comment": col.get("Comment", ""),
},
}
# TODO: review this code part as TableTypeDef class does not contain "Columns" attribute
for idx, col in enumerate(table["Columns"] + table.get("PartitionKeys", []))
]

Expand Down
19 changes: 12 additions & 7 deletions dbt/adapters/athena/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def add(self, relation: AthenaRelation) -> None:
RELATION_TYPE_MAP = {
"EXTERNAL_TABLE": TableType.TABLE,
"EXTERNAL": TableType.TABLE, # type returned by federated query tables
"GOVERNED": TableType.TABLE,
"MANAGED_TABLE": TableType.TABLE,
"VIRTUAL_VIEW": TableType.VIEW,
"table": TableType.TABLE,
Expand All @@ -91,16 +92,20 @@ def add(self, relation: AthenaRelation) -> None:


def get_table_type(table: TableTypeDef) -> TableType:
_type = RELATION_TYPE_MAP.get(table.get("TableType"))
_specific_type = table.get("Parameters", {}).get("table_type", "")
table_full_name = ".".join(filter(None, [table.get("CatalogId"), table.get("DatabaseName"), table["Name"]]))

if _specific_type.lower() == "iceberg":
_type = TableType.ICEBERG
input_table_type = table.get("TableType")
if input_table_type and input_table_type not in RELATION_TYPE_MAP:
raise ValueError(f"Table type {table['TableType']} is not supported for table {table_full_name}")

if _type is None:
raise ValueError("Table type cannot be None")
if table.get("Parameters", {}).get("table_type", "").lower() == "iceberg":
_type = TableType.ICEBERG
elif not input_table_type:
raise ValueError(f"Table type cannot be None for table {table_full_name}")
else:
_type = RELATION_TYPE_MAP[input_table_type]

LOGGER.debug(f"table_name : {table.get('Name')}")
LOGGER.debug(f"table_name : {table_full_name}")
LOGGER.debug(f"table type : {_type}")

return _type
13 changes: 5 additions & 8 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,16 +414,11 @@ def test__get_one_catalog(self, mock_aws_service):
mock_aws_service.create_database("baz")
mock_aws_service.create_table(table_name="bar", database_name="foo")
mock_aws_service.create_table(table_name="bar", database_name="quux")
mock_aws_service.create_table_without_type(table_name="qux", database_name="baz")
mock_information_schema = mock.MagicMock()
mock_information_schema.database = "awsdatacatalog"

self.adapter.acquire_connection("dummy")
actual = self.adapter._get_one_catalog(
mock_information_schema,
{"foo", "quux", "baz"},
self.used_schemas,
)
actual = self.adapter._get_one_catalog(mock_information_schema, {"foo", "quux"}, self.used_schemas)

expected_column_names = (
"table_database",
Expand All @@ -443,14 +438,16 @@ def test__get_one_catalog(self, mock_aws_service):
("awsdatacatalog", "quux", "bar", "table", None, "id", 0, "string", None),
("awsdatacatalog", "quux", "bar", "table", None, "country", 1, "string", None),
("awsdatacatalog", "quux", "bar", "table", None, "dt", 2, "date", None),
("awsdatacatalog", "baz", "qux", "table", None, "id", 0, "string", None),
("awsdatacatalog", "baz", "qux", "table", None, "country", 1, "string", None),
]
assert actual.column_names == expected_column_names
assert len(actual.rows) == len(expected_rows)
for row in actual.rows.values():
assert row.values() in expected_rows

mock_aws_service.create_table_without_type(table_name="qux", database_name="baz")
with pytest.raises(ValueError):
self.adapter._get_one_catalog(mock_information_schema, {"baz"}, self.used_schemas)

@mock_aws
def test__get_one_catalog_by_relations(self, mock_aws_service):
mock_aws_service.create_data_catalog()
Expand Down
23 changes: 13 additions & 10 deletions tests/unit/test_relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,24 @@


class TestRelation:
def test__get_relation_type_table(self):
assert get_table_type({"Name": "name", "TableType": "table"}) == TableType.TABLE
@pytest.mark.parametrize(
("table", "expected"),
[
({"Name": "n", "TableType": "table"}, TableType.TABLE),
({"Name": "n", "TableType": "VIRTUAL_VIEW"}, TableType.VIEW),
({"Name": "n", "TableType": "EXTERNAL_TABLE", "Parameters": {"table_type": "ICEBERG"}}, TableType.ICEBERG),
],
)
def test__get_relation_type(self, table, expected):
assert get_table_type(table) == expected

def test__get_relation_type_with_no_type(self):
with pytest.raises(ValueError):
get_table_type({"Name": "name"})

def test__get_relation_type_view(self):
assert get_table_type({"Name": "name", "TableType": "VIRTUAL_VIEW"}) == TableType.VIEW

def test__get_relation_type_iceberg(self):
assert (
get_table_type({"Name": "name", "TableType": "EXTERNAL_TABLE", "Parameters": {"table_type": "ICEBERG"}})
== TableType.ICEBERG
)
def test__get_relation_type_with_unknown_type(self):
with pytest.raises(ValueError):
get_table_type({"Name": "name", "TableType": "test"})


class TestAthenaRelation:
Expand Down
Loading