From 550a5d12e1c13ada2ade097c5b9a59537b6ad9d8 Mon Sep 17 00:00:00 2001 From: Ran Ever-Hadani Date: Fri, 26 Jun 2020 19:29:23 -0700 Subject: [PATCH] Make partition metadata available to BigQuery users --- .gitignore | 1 + core/dbt/adapters/base/impl.py | 14 ++++++ .../dbt/adapters/bigquery/connections.py | 44 +++++++++++++++++++ .../dbt/include/bigquery/macros/etc.sql | 8 ++++ 4 files changed, 67 insertions(+) diff --git a/.gitignore b/.gitignore index d1025d4ef4c..833f5b37564 100644 --- a/.gitignore +++ b/.gitignore @@ -83,3 +83,4 @@ target/ # pycharm .idea/ +.vscode/launch.json diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 6af38b5264b..ace549ced1e 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -228,6 +228,20 @@ def execute( fetch=fetch ) + @available.parse(lambda *a, **k: ('', empty_table())) + def get_partitions_metadata( + self, table_id: str + ) -> Tuple[agate.Table]: + """Obtain partitions metadata for a BigQuery partitioned table. + + :param str table_id: a partitioned table id, in standard SQL format. + :return: a partition metadata tuple, as described in https://cloud.google.com/bigquery/docs/creating-partitioned-tables#getting_partition_metadata_using_meta_tables. + :rtype: agate.Table + """ + return self.connections.get_partitions_metadata( + table_id=table_id + ) + ### # Methods that should never be overridden ### diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index b62dba5320d..399d1862ea7 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -267,6 +267,50 @@ def execute(self, sql, auto_begin=False, fetch=None): return status, res + # The following method intentionaly violates DRY, in that it is mostly + # copy-pasted from raw_execute(). This is done in order to discorage + # use of legacySQL queries in DBT, except to obtain partition metadata. + # the method would be removed when partition metadata becomes available + # from standardSQL. + def _raw_execute_legacy_sql(self, sql, fetch=False): + conn = self.get_thread_connection() + client = conn.handle + + logger.debug('On {}: {}', conn.name, sql) + + job_params = {'use_legacy_sql': True} + + priority = conn.credentials.priority + if priority == Priority.Batch: + job_params['priority'] = google.cloud.bigquery.QueryPriority.BATCH + else: + job_params[ + 'priority'] = google.cloud.bigquery.QueryPriority.INTERACTIVE + + maximum_bytes_billed = conn.credentials.maximum_bytes_billed + if maximum_bytes_billed is not None and maximum_bytes_billed != 0: + job_params['maximum_bytes_billed'] = maximum_bytes_billed + + def fn(): + return self._query_and_results(client, sql, conn, job_params) + + query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn) + + return query_job, iterator + + def get_partitions_metadata(self, table_id): + def standard_to_legacy(table_id): + table_ref = google.cloud.bigquery.table.TableReference.from_string(table_id) + return (table_ref.project + ':' + table_ref.dataset_id + '.' + table_ref.table_id).replace('`','') + + legacy_sql = 'SELECT * FROM [' + standard_to_legacy(table_id) + '$__PARTITIONS_SUMMARY__]' + + sql = self._add_query_comment(legacy_sql) + # auto_begin is ignored on bigquery, and only included for consistency + _, iterator = self._raw_execute_legacy_sql(sql, fetch='fetch_result') + + return self.get_table_from_response(iterator) + def create_bigquery_table(self, database, schema, table_name, callback, sql): """Create a bigquery table. The caller must supply a callback diff --git a/plugins/bigquery/dbt/include/bigquery/macros/etc.sql b/plugins/bigquery/dbt/include/bigquery/macros/etc.sql index a10ad1a5bd0..2091133e878 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/etc.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/etc.sql @@ -5,3 +5,11 @@ {% macro grant_access_to(entity, entity_type, role, grant_target_dict) -%} {% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %} {% endmacro %} + +{%- macro get_partitions_metadata(table) -%} + {%- if execute -%} + {%- set res = adapter.get_partitions_metadata(table) -%} + {{- return(res) -}} + {%- endif -%} + {{- return(None) -}} +{%- endmacro -%} \ No newline at end of file