diff --git a/CHANGELOG.md b/CHANGELOG.md index 135df3fbe..9075551c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,14 @@ - Cast `table_owner` to string to avoid errors generating docs ([#158](https://github.com/fishtown-analytics/dbt-spark/pull/158), [#159](https://github.com/fishtown-analytics/dbt-spark/pull/159)) +### Under the hood + +- Parse information returned by `list_relations_without_caching` macro to speed up catalog generation ([#93](https://github.com/fishtown-analytics/dbt-spark/issues/93), [#160](https://github.com/fishtown-analytics/dbt-spark/pull/160)) + ### Contributors - [@friendofasquid](https://github.com/friendofasquid) ([#159](https://github.com/fishtown-analytics/dbt-spark/pull/159)) +- [@franloza](https://github.com/franloza) ([#160](https://github.com/fishtown-analytics/dbt-spark/pull/160)) + ## dbt-spark 0.19.1 (Release TBD) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index a12e7a940..93cfba687 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,3 +1,4 @@ +import re from concurrent.futures import Future from dataclasses import dataclass from typing import Optional, List, Dict, Any, Union, Iterable @@ -60,6 +61,11 @@ class SparkAdapter(SQLAdapter): 'stats:rows:description', 'stats:rows:include', ) + INFORMATION_COLUMNS_REGEX = re.compile( + r"\|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE) + INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE) + INFORMATION_STATISTICS_REGEX = re.compile( + r"^Statistics: (.*)$", re.MULTILINE) Relation = SparkRelation Column = SparkColumn @@ -139,7 +145,8 @@ def list_relations_without_caching( schema=_schema, identifier=name, type=rel_type, - is_delta=is_delta + information=information, + is_delta=is_delta, ) relations.append(relation) @@ -197,19 +204,54 @@ def find_table_information_separator(rows: List[dict]) -> int: return pos def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: - rows: List[agate.Row] = super().get_columns_in_relation(relation) - return self.parse_describe_extended(relation, rows) + cached_relations = self.cache.get_relations( + relation.database, relation.schema) + cached_relation = next((cached_relation + for cached_relation in cached_relations + if str(cached_relation) == str(relation)), + None) + if cached_relation is None: + rows: List[agate.Row] = super().get_columns_in_relation(relation) + columns = self.parse_describe_extended(relation, rows) + else: + columns = self.parse_columns_from_information(cached_relation) + return columns + + def parse_columns_from_information( + self, relation: SparkRelation + ) -> List[SparkColumn]: + owner_match = re.findall( + self.INFORMATION_OWNER_REGEX, relation.information) + owner = owner_match[0] if owner_match else None + matches = re.finditer( + self.INFORMATION_COLUMNS_REGEX, relation.information) + columns = [] + stats_match = re.findall( + self.INFORMATION_STATISTICS_REGEX, relation.information) + raw_table_stats = stats_match[0] if stats_match else None + table_stats = SparkColumn.convert_table_stats(raw_table_stats) + for match_num, match in enumerate(matches): + column_name, column_type, nullable = match.groups() + column = SparkColumn( + table_database=None, + table_schema=relation.schema, + table_name=relation.table, + table_type=relation.type, + column_index=match_num, + table_owner=owner, + column=column_name, + dtype=column_type, + table_stats=table_stats + ) + columns.append(column) + return columns def _get_columns_for_catalog( self, relation: SparkRelation ) -> Iterable[Dict[str, Any]]: - properties = self.get_properties(relation) - columns = self.get_columns_in_relation(relation) - owner = properties.get(KEY_TABLE_OWNER) + columns = self.parse_columns_from_information(relation) for column in columns: - if owner: - column.table_owner = owner # convert SparkColumns into catalog dicts as_dict = column.to_column_dict() as_dict['column_name'] = as_dict.pop('column', None) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 507f51d3b..5fc096550 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -26,6 +26,7 @@ class SparkRelation(BaseRelation): include_policy: SparkIncludePolicy = SparkIncludePolicy() quote_character: str = '`' is_delta: Optional[bool] = None + information: str = None def __post_init__(self): if self.database != self.schema and self.database: diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index 5e50e3100..d886ddee3 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -446,3 +446,177 @@ def test_profile_with_cluster_and_sql_endpoint(self): } with self.assertRaises(RuntimeException): config_from_parts_or_dicts(self.project_cfg, profile) + + def test_parse_columns_from_information_with_table_type_and_delta_provider(self): + self.maxDiff = None + rel_type = SparkRelation.get_relation_type.Table + + # Mimics the output of Spark in the information column + information = ( + "Database: default_schema\n" + "Table: mytable\n" + "Owner: root\n" + "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" + "Last Access: Wed May 20 19:25:00 UTC 1925\n" + "Created By: Spark 3.0.1\n" + "Type: MANAGED\n" + "Provider: delta\n" + "Statistics: 123456789 bytes\n" + "Location: /mnt/vo\n" + "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" + "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" + "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" + "Partition Provider: Catalog\n" + "Partition Columns: [`dt`]\n" + "Schema: root\n" + " |-- col1: decimal(22,0) (nullable = true)\n" + " |-- col2: string (nullable = true)\n" + " |-- dt: date (nullable = true)\n" + ) + relation = SparkRelation.create( + schema='default_schema', + identifier='mytable', + type=rel_type, + information=information + ) + + config = self._get_target_http(self.project_cfg) + columns = SparkAdapter(config).parse_columns_from_information( + relation) + self.assertEqual(len(columns), 3) + self.assertEqual(columns[0].to_column_dict(omit_none=False), { + 'table_database': None, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_owner': 'root', + 'column': 'col1', + 'column_index': 0, + 'dtype': 'decimal(22,0)', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None, + + 'stats:bytes:description': '', + 'stats:bytes:include': True, + 'stats:bytes:label': 'bytes', + 'stats:bytes:value': 123456789, + }) + + def test_parse_columns_from_information_with_view_type(self): + self.maxDiff = None + rel_type = SparkRelation.get_relation_type.View + information = ( + "Database: default_schema\n" + "Table: myview\n" + "Owner: root\n" + "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" + "Last Access: UNKNOWN\n" + "Created By: Spark 3.0.1\n" + "Type: VIEW\n" + "View Text: WITH base (\n" + " SELECT * FROM source_table\n" + ")\n" + "SELECT col1, col2, dt FROM base\n" + "View Original Text: WITH base (\n" + " SELECT * FROM source_table\n" + ")\n" + "SELECT col1, col2, dt FROM base\n" + "View Catalog and Namespace: spark_catalog.default\n" + "View Query Output Columns: [col1, col2, dt]\n" + "Table Properties: [view.query.out.col.1=col1, view.query.out.col.2=col2, " + "transient_lastDdlTime=1618324324, view.query.out.col.3=dt, " + "view.catalogAndNamespace.part.0=spark_catalog, " + "view.catalogAndNamespace.part.1=default]\n" + "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" + "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" + "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" + "Storage Properties: [serialization.format=1]\n" + "Schema: root\n" + " |-- col1: decimal(22,0) (nullable = true)\n" + " |-- col2: string (nullable = true)\n" + " |-- dt: date (nullable = true)\n" + ) + relation = SparkRelation.create( + schema='default_schema', + identifier='myview', + type=rel_type, + information=information + ) + + config = self._get_target_http(self.project_cfg) + columns = SparkAdapter(config).parse_columns_from_information( + relation) + self.assertEqual(len(columns), 3) + self.assertEqual(columns[1].to_column_dict(omit_none=False), { + 'table_database': None, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_owner': 'root', + 'column': 'col2', + 'column_index': 1, + 'dtype': 'string', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None + }) + + def test_parse_columns_from_information_with_table_type_and_parquet_provider(self): + self.maxDiff = None + rel_type = SparkRelation.get_relation_type.Table + + information = ( + "Database: default_schema\n" + "Table: mytable\n" + "Owner: root\n" + "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" + "Last Access: Wed May 20 19:25:00 UTC 1925\n" + "Created By: Spark 3.0.1\n" + "Type: MANAGED\n" + "Provider: parquet\n" + "Statistics: 1234567890 bytes, 12345678 rows\n" + "Location: /mnt/vo\n" + "Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe\n" + "InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat\n" + "OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat\n" + "Schema: root\n" + " |-- col1: decimal(22,0) (nullable = true)\n" + " |-- col2: string (nullable = true)\n" + " |-- dt: date (nullable = true)\n" + ) + relation = SparkRelation.create( + schema='default_schema', + identifier='mytable', + type=rel_type, + information=information + ) + + config = self._get_target_http(self.project_cfg) + columns = SparkAdapter(config).parse_columns_from_information( + relation) + self.assertEqual(len(columns), 3) + self.assertEqual(columns[2].to_column_dict(omit_none=False), { + 'table_database': None, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_owner': 'root', + 'column': 'dt', + 'column_index': 2, + 'dtype': 'date', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None, + + 'stats:bytes:description': '', + 'stats:bytes:include': True, + 'stats:bytes:label': 'bytes', + 'stats:bytes:value': 1234567890, + + 'stats:rows:description': '', + 'stats:rows:include': True, + 'stats:rows:label': 'rows', + 'stats:rows:value': 12345678 + }) + diff --git a/test/unit/test_column.py b/test/unit/test_column.py new file mode 100644 index 000000000..f7f8d8776 --- /dev/null +++ b/test/unit/test_column.py @@ -0,0 +1,38 @@ +import unittest + +from dbt.adapters.spark import SparkColumn + + +class TestSparkColumn(unittest.TestCase): + + def test_convert_table_stats_with_no_statistics(self): + self.assertDictEqual( + SparkColumn.convert_table_stats(None), + {} + ) + + def test_convert_table_stats_with_bytes(self): + self.assertDictEqual( + SparkColumn.convert_table_stats("123456789 bytes"), + { + 'stats:bytes:description': '', + 'stats:bytes:include': True, + 'stats:bytes:label': 'bytes', + 'stats:bytes:value': 123456789 + } + ) + + def test_convert_table_stats_with_bytes_and_rows(self): + self.assertDictEqual( + SparkColumn.convert_table_stats("1234567890 bytes, 12345678 rows"), + { + 'stats:bytes:description': '', + 'stats:bytes:include': True, + 'stats:bytes:label': 'bytes', + 'stats:bytes:value': 1234567890, + 'stats:rows:description': '', + 'stats:rows:include': True, + 'stats:rows:label': 'rows', + 'stats:rows:value': 12345678 + } + )