From 72d83f988e3c0f6c22e21fdd4b87681f7452f843 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Wed, 30 Oct 2019 20:38:23 -0600 Subject: [PATCH 1/5] Make bigquery adapters SQLAdapters Implement more things via macros Refactor Relations vs InformationSchemas to handle BQ better Fix a bug where bigquery cached uppercase schema names wrong - by using information_schema this just goes away :) --- core/dbt/adapters/base/impl.py | 2 +- core/dbt/adapters/base/relation.py | 141 +++++++++++++----- core/dbt/adapters/cache.py | 5 +- core/dbt/adapters/sql/impl.py | 5 +- .../global_project/macros/adapters/common.sql | 20 +-- .../dbt/adapters/bigquery/connections.py | 9 +- .../bigquery/dbt/adapters/bigquery/impl.py | 50 +------ .../dbt/adapters/bigquery/relation.py | 54 +++---- .../dbt/include/bigquery/macros/adapters.sql | 67 +++++++-- .../dbt/include/bigquery/macros/catalog.sql | 8 +- .../case-sensitive-models/model.sql | 5 + .../test_bigquery_case_sensitive_dataset.py | 24 +++ test/integration/base.py | 12 +- 13 files changed, 259 insertions(+), 143 deletions(-) create mode 100644 test/integration/022_bigquery_test/case-sensitive-models/model.sql create mode 100644 test/integration/022_bigquery_test/test_bigquery_case_sensitive_dataset.py diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 78387e985fc..eea5596993b 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -334,7 +334,7 @@ def _get_cache_schemas( # result is a map whose keys are information_schema Relations without # identifiers that have appropriate database prefixes, and whose values # are sets of lowercase schema names that are valid members of those - # schemas + # databases return info_schema_name_map def _relations_cache_for_schemas(self, manifest: Manifest) -> None: diff --git a/core/dbt/adapters/base/relation.py b/core/dbt/adapters/base/relation.py index 55224863dd6..d429fb7f30b 100644 --- a/core/dbt/adapters/base/relation.py +++ b/core/dbt/adapters/base/relation.py @@ -6,7 +6,7 @@ from collections.abc import Mapping, Hashable from dataclasses import dataclass, fields from typing import ( - Optional, TypeVar, Generic, Any, Type, Dict, Union, List + Optional, TypeVar, Generic, Any, Type, Dict, Union, List, Iterator, Tuple ) from typing_extensions import Protocol @@ -106,6 +106,21 @@ class Path(_ComponentObject[Optional[str]]): schema: Optional[str] identifier: Optional[str] + def __post_init__(self): + # handle pesky jinja2.Undefined sneaking in here and messing up render + if not isinstance(self.database, (type(None), str)): + raise dbt.exceptions.CompilationException( + 'Got an invalid path database: {}'.format(self.database) + ) + if not isinstance(self.schema, (type(None), str)): + raise dbt.exceptions.CompilationException( + 'Got an invalid path schema: {}'.format(self.schema) + ) + if not isinstance(self.identifier, (type(None), str)): + raise dbt.exceptions.CompilationException( + 'Got an invalid path identifier: {}'.format(self.identifier) + ) + def get_lowered_part(self, key: ComponentName) -> Optional[str]: part = self.get_part(key) if part is not None: @@ -193,6 +208,9 @@ def matches( return exact_match + def replace_path(self, **kwargs): + return self.replace(path=self.path.replace(**kwargs)) + def quote( self: Self, database: Optional[bool] = None, @@ -223,46 +241,32 @@ def include( new_include_policy = self.include_policy.replace_dict(policy) return self.replace(include_policy=new_include_policy) - def information_schema(self: Self, identifier=None) -> Self: - include_policy = self.include_policy.replace( - database=self.database is not None, - schema=True, - identifier=identifier is not None - ) - quote_policy = self.quote_policy.replace( - schema=False, - identifier=False, - ) - - path = self.path.replace( - schema='information_schema', - identifier=identifier, - ) + def information_schema(self, view_name=None) -> 'InformationSchema': + # some of our data comes from jinja, where things can be `Undefined`. + if not isinstance(view_name, str): + view_name = None - return self.replace( - quote_policy=quote_policy, - include_policy=include_policy, - path=path, - ) + return InformationSchema.from_relation(self, view_name) - def information_schema_only(self: Self) -> Self: + def information_schema_only(self) -> 'InformationSchema': return self.information_schema() - def information_schema_table(self: Self, identifier: str) -> Self: - return self.information_schema(identifier) + def _render_iterator( + self + ) -> Iterator[Tuple[Optional[ComponentName], Optional[str]]]: - def render(self) -> str: - parts: List[str] = [] + for key in ComponentName: + path_part: Optional[str] = None + if self.include_policy.get_part(key): + path_part = self.path.get_part(key) + if path_part is not None and self.quote_policy.get_part(key): + path_part = self.quoted(path_part) + yield key, path_part - for k in ComponentName: - if self.include_policy.get_part(k): - path_part = self.path.get_part(k) - - if path_part is not None: - part: str = path_part - if self.quote_policy.get_part(k): - part = self.quoted(path_part) - parts.append(part) + def render(self) -> str: + parts: List[str] = [ + part for _, part in self._render_iterator() if part is not None + ] if len(parts) == 0: raise dbt.exceptions.RuntimeException( @@ -417,3 +421,70 @@ def External(cls) -> str: @classproperty def RelationType(cls) -> Type[RelationType]: return RelationType + + +@dataclass(frozen=True, eq=False, repr=False) +class InformationSchema(BaseRelation): + information_schema_view: Optional[str] = None + + def __post_init__(self): + if not isinstance(self.information_schema_view, (type(None), str)): + raise dbt.exceptions.CompilationException( + 'Got an invalid name: {}'.format(self.information_schema_view) + ) + + @classmethod + def get_path( + cls, relation: BaseRelation, information_schema_view: Optional[str] + ) -> Path: + return Path( + database=relation.database, + schema=relation.schema, + identifier='INFORMATION_SCHEMA', + ) + + @classmethod + def get_include_policy( + cls, + relation, + information_schema_view: Optional[str], + ) -> Policy: + return relation.include_policy.replace( + database=relation.database is not None, + schema=False, + identifier=True, + ) + + @classmethod + def get_quote_policy( + cls, + relation, + information_schema_view: Optional[str], + ) -> Policy: + return relation.quote_policy.replace( + identifier=False, + ) + + @classmethod + def from_relation( + cls: Self, + relation: BaseRelation, + information_schema_view: Optional[str], + ) -> Self: + include_policy = cls.get_include_policy( + relation, information_schema_view + ) + quote_policy = cls.get_quote_policy(relation, information_schema_view) + path = cls.get_path(relation, information_schema_view) + return cls( + type=RelationType.View, + path=path, + include_policy=include_policy, + quote_policy=quote_policy, + information_schema_view=information_schema_view, + ) + + def _render_iterator(self): + for k, v in super()._render_iterator(): + yield k, v + yield None, self.information_schema_view diff --git a/core/dbt/adapters/cache.py b/core/dbt/adapters/cache.py index b0525ada252..ca858cb95d3 100644 --- a/core/dbt/adapters/cache.py +++ b/core/dbt/adapters/cache.py @@ -469,12 +469,13 @@ def get_relations(self, database, schema): :return List[BaseRelation]: The list of relations with the given schema """ + database = _lower(database) schema = _lower(schema) with self.lock: results = [ r.inner for r in self.relations.values() - if (r.schema == _lower(schema) and - r.database == _lower(database)) + if (_lower(r.schema) == schema and + _lower(r.database) == database) ] if None in results: diff --git a/core/dbt/adapters/sql/impl.py b/core/dbt/adapters/sql/impl.py index a759eceec5d..63ef8109b77 100644 --- a/core/dbt/adapters/sql/impl.py +++ b/core/dbt/adapters/sql/impl.py @@ -222,9 +222,12 @@ def list_schemas(self, database): def check_schema_exists(self, database, schema): information_schema = self.Relation.create( - database=database, schema=schema, + database=database, + schema=schema, + identifier='INFORMATION_SCHEMA', quote_policy=self.config.quoting ).information_schema() + information_schema.render() kwargs = {'information_schema': information_schema, 'schema': schema} results = self.execute_macro( diff --git a/core/dbt/include/global_project/macros/adapters/common.sql b/core/dbt/include/global_project/macros/adapters/common.sql index 7725f2e0981..56159e26f78 100644 --- a/core/dbt/include/global_project/macros/adapters/common.sql +++ b/core/dbt/include/global_project/macros/adapters/common.sql @@ -182,9 +182,9 @@ {% macro default__information_schema_name(database) -%} {%- if database -%} - {{ adapter.quote_as_configured(database, 'database') }}.information_schema + {{ adapter.quote_as_configured(database, 'database') }}.INFORMATION_SCHEMA {%- else -%} - information_schema + INFORMATION_SCHEMA {%- endif -%} {%- endmacro %} @@ -194,12 +194,12 @@ {% endmacro %} {% macro default__list_schemas(database) -%} - {% call statement('list_schemas', fetch_result=True, auto_begin=False) %} + {% set sql %} select distinct schema_name - from {{ information_schema_name(database) }}.schemata + from {{ information_schema_name(database) }}.SCHEMATA where catalog_name ilike '{{ database }}' - {% endcall %} - {{ return(load_result('list_schemas').table) }} + {% endset %} + {{ return(run_query(sql)) }} {% endmacro %} @@ -208,13 +208,13 @@ {% endmacro %} {% macro default__check_schema_exists(information_schema, schema) -%} - {% call statement('check_schema_exists', fetch_result=True, auto_begin=False) -%} + {% set sql -%} select count(*) - from {{ information_schema }}.schemata + from {{ information_schema.replace(information_schema_view='SCHEMATA') }} where catalog_name='{{ information_schema.database }}' and schema_name='{{ schema }}' - {%- endcall %} - {{ return(load_result('check_schema_exists').table) }} + {%- endset %} + {{ return(run_query(sql)) }} {% endmacro %} diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 51f37b5bfb2..e2fc2f4f876 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -296,7 +296,14 @@ def drop_dataset(self, database, schema): client = conn.handle with self.exception_handler('drop dataset'): - for table in client.list_tables(dataset): + try: + tables = list(client.list_tables(dataset)) + except google.api_core.exceptions.NotFound: + # the dataset doesn't exist. return here to match + # 'drop schema if exists' behavior. If anything 404s after this + # then there are real problems that should cause us to raise. + return + for table in tables: client.delete_table(table.reference) client.delete_dataset(dataset) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 74a3b4fdebc..c116b20adfc 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -4,7 +4,8 @@ import dbt.clients.gcloud import dbt.clients.agate_helper -from dbt.adapters.base import BaseAdapter, available, RelationType +from dbt.adapters.base import available, RelationType +from dbt.adapters.sql import SQLAdapter from dbt.adapters.bigquery.relation import ( BigQueryRelation ) @@ -33,7 +34,7 @@ def _stub_relation(*args, **kwargs): ) -class BigQueryAdapter(BaseAdapter): +class BigQueryAdapter(SQLAdapter): RELATION_TYPES = { 'TABLE': RelationType.Table, @@ -82,21 +83,6 @@ def rename_relation(self, from_relation, to_relation): '`rename_relation` is not implemented for this adapter!' ) - @available - def list_schemas(self, database): - conn = self.connections.get_thread_connection() - client = conn.handle - - with self.connections.exception_handler('list dataset'): - # this is similar to how we have to deal with listing tables - all_datasets = client.list_datasets(project=database, - max_results=10000) - return [ds.dataset_id for ds in all_datasets] - - @available - def check_schema_exists(self, database, schema): - return super().check_schema_exists(database, schema) - def get_columns_in_relation(self, relation): try: table = self.connections.get_bq_table( @@ -118,33 +104,6 @@ def expand_target_column_types(self, from_relation, to_relation): # This is a no-op on BigQuery pass - def list_relations_without_caching(self, information_schema, schema): - connection = self.connections.get_thread_connection() - client = connection.handle - - bigquery_dataset = self.connections.dataset( - information_schema.database, schema, connection - ) - - all_tables = client.list_tables( - bigquery_dataset, - # BigQuery paginates tables by alphabetizing them, and using - # the name of the last table on a page as the key for the - # next page. If that key table gets dropped before we run - # list_relations, then this will 404. So, we avoid this - # situation by making the page size sufficiently large. - # see: https://github.com/fishtown-analytics/dbt/issues/726 - # TODO: cache the list of relations up front, and then we - # won't need to do this - max_results=100000) - - # This will 404 if the dataset does not exist. This behavior mirrors - # the implementation of list_relations for other adapters - try: - return [self._bq_table_to_relation(table) for table in all_tables] - except google.api_core.exceptions.NotFound: - return [] - def get_relation(self, database, schema, identifier): if self._schema_is_cached(database, schema): # if it's in the cache, use the parent's model of going through @@ -167,9 +126,6 @@ def create_schema(self, database, schema): def drop_schema(self, database, schema): logger.debug('Dropping schema "{}.{}".', database, schema) - - if not self.check_schema_exists(database, schema): - return self.connections.drop_dataset(database, schema) @classmethod diff --git a/plugins/bigquery/dbt/adapters/bigquery/relation.py b/plugins/bigquery/dbt/adapters/bigquery/relation.py index 008693c74db..7cca7a1c203 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/relation.py +++ b/plugins/bigquery/dbt/adapters/bigquery/relation.py @@ -2,7 +2,7 @@ from typing import Optional from dbt.adapters.base.relation import ( - BaseRelation, ComponentName + BaseRelation, ComponentName, InformationSchema ) from dbt.utils import filter_null_values from typing import TypeVar @@ -45,32 +45,36 @@ def project(self): def dataset(self): return self.schema - def information_schema(self: Self, identifier=None) -> Self: - # BigQuery (usually) addresses information schemas at the dataset - # level. This method overrides the BaseRelation method to return an - # Information Schema relation as project.dataset.information_schem + def information_schema( + self, identifier: Optional[str] = None + ) -> 'BigQueryInformationSchema': + return BigQueryInformationSchema.from_relation(self, identifier) - include_policy = self.include_policy.replace( - database=self.database is not None, - schema=self.schema is not None, - identifier=True - ) - # Quote everything on BigQuery -- identifiers are case-sensitive, - # even when quoted. - quote_policy = self.quote_policy.replace( - database=True, - schema=True, - identifier=True, - ) +@dataclass(frozen=True, eq=False, repr=False) +class BigQueryInformationSchema(InformationSchema): + quote_character: str = '`' - path = self.path.replace( - schema=self.schema, - identifier='INFORMATION_SCHEMA' - ) + @classmethod + def get_include_policy(cls, relation, information_schema_view): + schema = True + if information_schema_view in ('SCHEMATA', 'SCHEMATA_OPTIONS', None): + schema = False + + identifier = True + if information_schema_view == '__TABLES__': + identifier = False - return self.replace( - quote_policy=quote_policy, - include_policy=include_policy, - path=path, + return relation.quote_policy.replace( + schema=schema, + identifier=identifier, ) + + def replace(self, **kwargs): + if 'information_schema_view' in kwargs: + view = kwargs['information_schema_view'] + # we also need to update the include policy, unless the caller did + # in which case it's their problem + if 'include_policy' not in kwargs: + kwargs['include_policy'] = self.get_include_policy(self, view) + return super().replace(**kwargs) diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index 719860ea40c..eb9f63c32be 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -89,21 +89,66 @@ {% endmacro %} -{% macro bigquery__list_relations_without_caching(information_schema, schema) -%} - {{ return(adapter.list_relations_without_caching(information_schema, schema)) }} +{% macro bigquery__list_schemas(database) -%} + {% set sql %} + select distinct schema_name + from {{ information_schema_name(database) }}.SCHEMATA + where UPPER(catalog_name) like UPPER('{{ database }}') + {% endset %} + {{ return(run_query(sql)) }} {% endmacro %} +{% macro empty_table() %} + {# This is the only way I know in jinja to get an empty agate table #} + {% do store_result('_empty_table', '', None) %} + {{ return(load_result('_empty_table')['table']) }} +{% endmacro %} -{% macro bigquery__current_timestamp() -%} - CURRENT_TIMESTAMP() -{%- endmacro %} +{%- macro bigquery_similar_schemas(database, schema) -%} + {%- set sql -%} + select distinct schema_name + from {{ information_schema_name(database) }}.SCHEMATA + where UPPER(catalog_name) like UPPER('{{ database }}') + and UPPER(schema_name) like UPPER('{{ schema }}') + {%- endset -%} + {%- set schemas = [] -%} + {%- for row in run_query(sql) -%} + {%- do schemas.append(row['schema_name']) %} + {%- endfor -%} + {{ return(schemas) }} +{%- endmacro -%} -{% macro bigquery__list_schemas(database) %} - {{ return(adapter.list_schemas()) }} -{% endmacro %} +{% macro bigquery__list_relations_without_caching(information_schema, schema) -%} + {# In bigquery, you can't query the full information schema, you can only do so + by schema (so 'database.schema.information_schema.tables'). But our schema + value is case-insensitive for annoying reasons involving quoting. So you + have figure out what schemas match the given schema first, and query them each. + #} + {%- set schema_candidates = bigquery_similar_schemas(information_schema.database, schema) -%} + {%- if (schema_candidates | length) == 0 -%} + {{ return(empty_table()) }} + {%- endif -%} + {%- set query -%} + {%- for s in schema_candidates %} + select + table_catalog as database, + table_name as name, + table_schema as schema, + case when table_type = 'BASE TABLE' then 'table' + when table_type = 'VIEW' then 'view' + when table_type = 'EXTERNAL TABLE' then 'external' + else table_type + end as table_type + from {{ information_schema.replace(information_schema_view='TABLES') }} + {% if not loop.last %}union all{% endif %} + {%- endfor %} + {%- endset -%} + {{ return(run_query(query)) }} +{%- endmacro %} -{% macro bigquery__check_schema_exists(information_schema, schema) %} - {{ return(adapter.check_schema_exists(information_schema.database, schema)) }} -{% endmacro %} + +{% macro bigquery__current_timestamp() -%} + CURRENT_TIMESTAMP() +{%- endmacro %} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/catalog.sql b/plugins/bigquery/dbt/include/bigquery/macros/catalog.sql index 2d1b0f249c6..2f52549132f 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/catalog.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/catalog.sql @@ -11,7 +11,7 @@ schema_name as table_schema, location - from {{ information_schema.include(schema=False) }}.SCHEMATA + from {{ information_schema.replace(information_schema_view='SCHEMATA') }} ), @@ -35,7 +35,7 @@ REGEXP_EXTRACT(table_id, '^(.+)[0-9]{8}$') as shard_base_name, REGEXP_EXTRACT(table_id, '^.+([0-9]{8})$') as shard_name - from {{ information_schema.include(identifier=False) }}.__TABLES__ + from {{ information_schema.replace(information_schema_view='__TABLES__') }} ), @@ -92,7 +92,7 @@ is_partitioning_column, clustering_ordinal_position - from {{ information_schema }}.COLUMNS + from {{ information_schema.replace(information_schema_view='COLUMNS') }} where ordinal_position is not null ), @@ -105,7 +105,7 @@ data_type as column_type, column_name as base_column_name - from {{ information_schema }}.COLUMN_FIELD_PATHS + from {{ information_schema.replace(information_schema_view='COLUMN_FIELD_PATHS') }} where data_type not like 'STRUCT%' ), diff --git a/test/integration/022_bigquery_test/case-sensitive-models/model.sql b/test/integration/022_bigquery_test/case-sensitive-models/model.sql new file mode 100644 index 00000000000..1934d4cfc0c --- /dev/null +++ b/test/integration/022_bigquery_test/case-sensitive-models/model.sql @@ -0,0 +1,5 @@ +{{ config(materialized='incremental') }} +select 1 as id +{% if is_incremental() %} +this is a syntax error! +{% endif %} diff --git a/test/integration/022_bigquery_test/test_bigquery_case_sensitive_dataset.py b/test/integration/022_bigquery_test/test_bigquery_case_sensitive_dataset.py new file mode 100644 index 00000000000..46d3933f11b --- /dev/null +++ b/test/integration/022_bigquery_test/test_bigquery_case_sensitive_dataset.py @@ -0,0 +1,24 @@ +from test.integration.base import DBTIntegrationTest, use_profile + + +class TestCaseSensitiveSchemaBigQueryRun(DBTIntegrationTest): + + @property + def schema(self): + return "BigQuerY_test_022" + + def unique_schema(self): + schema = self.schema + + to_return = "{}_{}".format(self.prefix, schema) + return to_return + + @property + def models(self): + return "case-sensitive-models" + + @use_profile('bigquery') + def test__bigquery_double_run_fails(self): + results = self.run_dbt() + self.assertEqual(len(results), 1) + self.run_dbt(expect_pass=False) diff --git a/test/integration/base.py b/test/integration/base.py index 0ee44552f14..c02470d48de 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -459,9 +459,7 @@ def _create_schema_named(self, database, schema): def _drop_schema_named(self, database, schema): if self.adapter_type == 'bigquery' or self.adapter_type == 'presto': - self.adapter.drop_schema( - database, schema - ) + self.adapter.drop_schema(database, schema) else: schema_fqn = self._get_schema_fqn(database, schema) self.run_sql(self.DROP_SCHEMA_STATEMENT.format(schema_fqn)) @@ -487,9 +485,11 @@ def _drop_schemas_sql(self): self._get_schema_fqn(self.default_database, schema) ) # on postgres/redshift, this will make you sad - drop_alternative = self.setup_alternate_db and \ - self.adapter_type not in {'postgres', 'redshift'} and \ - self.alternative_database + drop_alternative = ( + self.setup_alternate_db and + self.adapter_type not in {'postgres', 'redshift'} and + self.alternative_database + ) if drop_alternative: self._created_schemas.add( self._get_schema_fqn(self.alternative_database, schema) From 30cd27e5fc39c077ffe53b39dad1753a37b1860e Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 1 Nov 2019 08:51:29 -0600 Subject: [PATCH 2/5] Fix BQ list_relations_without_caching + check_schema_exists --- .../bigquery/dbt/adapters/bigquery/impl.py | 21 ++++++++++ .../dbt/include/bigquery/macros/adapters.sql | 42 +++++-------------- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index c116b20adfc..af25dcda829 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -83,6 +83,27 @@ def rename_relation(self, from_relation, to_relation): '`rename_relation` is not implemented for this adapter!' ) + @available.parse(lambda *a, **k: False) + def check_schema_exists(self, database: str, schema: str) -> bool: + conn = self.connections.get_thread_connection() + client = conn.handle + + bigquery_dataset = self.connections.dataset( + database, schema, conn + ) + # try to do things with the dataset. If it doesn't exist it will 404. + # we have to do it this way to handle underscore-prefixed datasets, + # which appear in neither the information_schema.schemata view nor the + # list_datasets method. + try: + next(client.list_tables(bigquery_dataset, max_results=1)) + except StopIteration: + pass + except google.api_core.exceptions.NotFound: + # the schema does not exist + return False + return True + def get_columns_in_relation(self, relation): try: table = self.connections.get_bq_table( diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index eb9f63c32be..9589f148ba7 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -105,45 +105,23 @@ {% endmacro %} -{%- macro bigquery_similar_schemas(database, schema) -%} - {%- set sql -%} - select distinct schema_name - from {{ information_schema_name(database) }}.SCHEMATA - where UPPER(catalog_name) like UPPER('{{ database }}') - and UPPER(schema_name) like UPPER('{{ schema }}') - {%- endset -%} - {%- set schemas = [] -%} - {%- for row in run_query(sql) -%} - {%- do schemas.append(row['schema_name']) %} - {%- endfor -%} - {{ return(schemas) }} -{%- endmacro -%} - - {% macro bigquery__list_relations_without_caching(information_schema, schema) -%} {# In bigquery, you can't query the full information schema, you can only do so by schema (so 'database.schema.information_schema.tables'). But our schema value is case-insensitive for annoying reasons involving quoting. So you have figure out what schemas match the given schema first, and query them each. #} - {%- set schema_candidates = bigquery_similar_schemas(information_schema.database, schema) -%} - {%- if (schema_candidates | length) == 0 -%} - {{ return(empty_table()) }} - {%- endif -%} {%- set query -%} - {%- for s in schema_candidates %} - select - table_catalog as database, - table_name as name, - table_schema as schema, - case when table_type = 'BASE TABLE' then 'table' - when table_type = 'VIEW' then 'view' - when table_type = 'EXTERNAL TABLE' then 'external' - else table_type - end as table_type - from {{ information_schema.replace(information_schema_view='TABLES') }} - {% if not loop.last %}union all{% endif %} - {%- endfor %} + select + table_catalog as database, + table_name as name, + table_schema as schema, + case when table_type = 'BASE TABLE' then 'table' + when table_type = 'VIEW' then 'view' + when table_type = 'EXTERNAL TABLE' then 'external' + else table_type + end as table_type + from {{ information_schema.replace(information_schema_view='TABLES') }} {%- endset -%} {{ return(run_query(query)) }} {%- endmacro %} From ca4979099c001b7e19dd12f089bfa74cca71a367 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 1 Nov 2019 10:29:40 -0600 Subject: [PATCH 3/5] so much for being a SQLAdapter --- core/dbt/adapters/sql/impl.py | 1 - .../bigquery/dbt/adapters/bigquery/impl.py | 45 +++++++++++++++++-- .../dbt/include/bigquery/macros/adapters.sql | 45 +++++-------------- 3 files changed, 52 insertions(+), 39 deletions(-) diff --git a/core/dbt/adapters/sql/impl.py b/core/dbt/adapters/sql/impl.py index 63ef8109b77..e15bc143a2c 100644 --- a/core/dbt/adapters/sql/impl.py +++ b/core/dbt/adapters/sql/impl.py @@ -227,7 +227,6 @@ def check_schema_exists(self, database, schema): identifier='INFORMATION_SCHEMA', quote_policy=self.config.quoting ).information_schema() - information_schema.render() kwargs = {'information_schema': information_schema, 'schema': schema} results = self.execute_macro( diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index af25dcda829..0a420db4edf 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -4,8 +4,7 @@ import dbt.clients.gcloud import dbt.clients.agate_helper -from dbt.adapters.base import available, RelationType -from dbt.adapters.sql import SQLAdapter +from dbt.adapters.base import BaseAdapter, available, RelationType from dbt.adapters.bigquery.relation import ( BigQueryRelation ) @@ -34,7 +33,7 @@ def _stub_relation(*args, **kwargs): ) -class BigQueryAdapter(SQLAdapter): +class BigQueryAdapter(BaseAdapter): RELATION_TYPES = { 'TABLE': RelationType.Table, @@ -83,6 +82,17 @@ def rename_relation(self, from_relation, to_relation): '`rename_relation` is not implemented for this adapter!' ) + @available + def list_schemas(self, database): + conn = self.connections.get_thread_connection() + client = conn.handle + + with self.connections.exception_handler('list dataset'): + # this is similar to how we have to deal with listing tables + all_datasets = client.list_datasets(project=database, + max_results=10000) + return [ds.dataset_id for ds in all_datasets] + @available.parse(lambda *a, **k: False) def check_schema_exists(self, database: str, schema: str) -> bool: conn = self.connections.get_thread_connection() @@ -96,7 +106,7 @@ def check_schema_exists(self, database: str, schema: str) -> bool: # which appear in neither the information_schema.schemata view nor the # list_datasets method. try: - next(client.list_tables(bigquery_dataset, max_results=1)) + next(iter(client.list_tables(bigquery_dataset, max_results=1))) except StopIteration: pass except google.api_core.exceptions.NotFound: @@ -125,6 +135,33 @@ def expand_target_column_types(self, from_relation, to_relation): # This is a no-op on BigQuery pass + def list_relations_without_caching(self, information_schema, schema): + connection = self.connections.get_thread_connection() + client = connection.handle + + bigquery_dataset = self.connections.dataset( + information_schema.database, information_schema.schema, connection + ) + + all_tables = client.list_tables( + bigquery_dataset, + # BigQuery paginates tables by alphabetizing them, and using + # the name of the last table on a page as the key for the + # next page. If that key table gets dropped before we run + # list_relations, then this will 404. So, we avoid this + # situation by making the page size sufficiently large. + # see: https://github.com/fishtown-analytics/dbt/issues/726 + # TODO: cache the list of relations up front, and then we + # won't need to do this + max_results=100000) + + # This will 404 if the dataset does not exist. This behavior mirrors + # the implementation of list_relations for other adapters + try: + return [self._bq_table_to_relation(table) for table in all_tables] + except google.api_core.exceptions.NotFound: + return [] + def get_relation(self, database, schema, identifier): if self._schema_is_cached(database, schema): # if it's in the cache, use the parent's model of going through diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index 9589f148ba7..126e6d3cd0f 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -89,44 +89,21 @@ {% endmacro %} -{% macro bigquery__list_schemas(database) -%} - {% set sql %} - select distinct schema_name - from {{ information_schema_name(database) }}.SCHEMATA - where UPPER(catalog_name) like UPPER('{{ database }}') - {% endset %} - {{ return(run_query(sql)) }} -{% endmacro %} - -{% macro empty_table() %} - {# This is the only way I know in jinja to get an empty agate table #} - {% do store_result('_empty_table', '', None) %} - {{ return(load_result('_empty_table')['table']) }} -{% endmacro %} - - {% macro bigquery__list_relations_without_caching(information_schema, schema) -%} - {# In bigquery, you can't query the full information schema, you can only do so - by schema (so 'database.schema.information_schema.tables'). But our schema - value is case-insensitive for annoying reasons involving quoting. So you - have figure out what schemas match the given schema first, and query them each. - #} - {%- set query -%} - select - table_catalog as database, - table_name as name, - table_schema as schema, - case when table_type = 'BASE TABLE' then 'table' - when table_type = 'VIEW' then 'view' - when table_type = 'EXTERNAL TABLE' then 'external' - else table_type - end as table_type - from {{ information_schema.replace(information_schema_view='TABLES') }} - {%- endset -%} - {{ return(run_query(query)) }} + {{ return(adapter.list_relations_without_caching(information_schema, schema)) }} {%- endmacro %} {% macro bigquery__current_timestamp() -%} CURRENT_TIMESTAMP() {%- endmacro %} + + +{% macro bigquery__list_schemas(database) -%} + {{ return(adapter.list_schemas()) }} +{% endmacro %} + + +{% macro bigquery__check_schema_exists(information_schema, schema) %} + {{ return(adapter.check_schema_exists(information_schema.database, schema)) }} +{% endmacro %} From 1bbc00c346c9d9386f3ecc4eab0218d641e49ec8 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 4 Nov 2019 08:57:58 -0700 Subject: [PATCH 4/5] handle funky case bigquery models, add tests --- .../bigquery/dbt/adapters/bigquery/impl.py | 87 +++++++++++++------ .../{model.sql => fUnKyCaSe.sql} | 0 .../case-sensitive-schemas/model.sql | 5 ++ .../022_bigquery_test/models/fUnKyCaSe.sql | 1 + .../022_bigquery_test/models/schema.yml | 10 +++ ...set.py => test_bigquery_case_sensitive.py} | 26 ++++-- .../test_simple_bigquery_view.py | 6 +- 7 files changed, 99 insertions(+), 36 deletions(-) rename test/integration/022_bigquery_test/case-sensitive-models/{model.sql => fUnKyCaSe.sql} (100%) create mode 100644 test/integration/022_bigquery_test/case-sensitive-schemas/model.sql create mode 100644 test/integration/022_bigquery_test/models/fUnKyCaSe.sql rename test/integration/022_bigquery_test/{test_bigquery_case_sensitive_dataset.py => test_bigquery_case_sensitive.py} (60%) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 0a420db4edf..648cf2a87e0 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -1,3 +1,5 @@ +from typing import Dict, List, Optional, Any + import dbt.deprecations import dbt.exceptions import dbt.flags as flags @@ -6,12 +8,13 @@ from dbt.adapters.base import BaseAdapter, available, RelationType from dbt.adapters.bigquery.relation import ( - BigQueryRelation + BigQueryRelation, BigQueryInformationSchema ) from dbt.adapters.bigquery import BigQueryColumn from dbt.adapters.bigquery import BigQueryConnectionManager from dbt.contracts.connection import Connection from dbt.logger import GLOBAL_LOGGER as logger +from dbt.utils import filter_null_values import google.auth import google.api_core @@ -52,14 +55,14 @@ class BigQueryAdapter(BaseAdapter): ### @classmethod - def date_function(cls): + def date_function(cls) -> str: return 'CURRENT_TIMESTAMP()' @classmethod - def is_cancelable(cls): + def is_cancelable(cls) -> bool: return False - def drop_relation(self, relation): + def drop_relation(self, relation: BigQueryRelation) -> None: is_cached = self._schema_is_cached(relation.database, relation.schema) if is_cached: self.cache_dropped(relation) @@ -72,18 +75,20 @@ def drop_relation(self, relation): relation_object = dataset.table(relation.identifier) client.delete_table(relation_object) - def truncate_relation(self, relation): + def truncate_relation(self, relation: BigQueryRelation) -> None: raise dbt.exceptions.NotImplementedException( '`truncate` is not implemented for this adapter!' ) - def rename_relation(self, from_relation, to_relation): + def rename_relation( + self, from_relation: BigQueryRelation, to_relation: BigQueryRelation + ) -> None: raise dbt.exceptions.NotImplementedException( '`rename_relation` is not implemented for this adapter!' ) @available - def list_schemas(self, database): + def list_schemas(self, database: str) -> List[str]: conn = self.connections.get_thread_connection() client = conn.handle @@ -114,7 +119,9 @@ def check_schema_exists(self, database: str, schema: str) -> bool: return False return True - def get_columns_in_relation(self, relation): + def get_columns_in_relation( + self, relation: BigQueryRelation + ) -> List[BigQueryColumn]: try: table = self.connections.get_bq_table( database=relation.database, @@ -127,15 +134,21 @@ def get_columns_in_relation(self, relation): logger.debug("get_columns_in_relation error: {}".format(e)) return [] - def expand_column_types(self, goal, current): + def expand_column_types( + self, goal: BigQueryRelation, current: BigQueryRelation + ) -> None: # This is a no-op on BigQuery pass - def expand_target_column_types(self, from_relation, to_relation): + def expand_target_column_types( + self, from_relation: BigQueryRelation, to_relation: BigQueryRelation + ) -> None: # This is a no-op on BigQuery pass - def list_relations_without_caching(self, information_schema, schema): + def list_relations_without_caching( + self, information_schema: BigQueryInformationSchema, schema: str + ) -> List[BigQueryRelation]: connection = self.connections.get_thread_connection() client = connection.handle @@ -162,7 +175,9 @@ def list_relations_without_caching(self, information_schema, schema): except google.api_core.exceptions.NotFound: return [] - def get_relation(self, database, schema, identifier): + def get_relation( + self, database: str, schema: str, identifier: str + ) -> BigQueryRelation: if self._schema_is_cached(database, schema): # if it's in the cache, use the parent's model of going through # the relations cache and picking out the relation @@ -178,47 +193,62 @@ def get_relation(self, database, schema, identifier): table = None return self._bq_table_to_relation(table) - def create_schema(self, database, schema): + def create_schema(self, database: str, schema: str) -> None: logger.debug('Creating schema "{}.{}".', database, schema) self.connections.create_dataset(database, schema) - def drop_schema(self, database, schema): + def drop_schema(self, database: str, schema: str) -> None: logger.debug('Dropping schema "{}.{}".', database, schema) self.connections.drop_dataset(database, schema) @classmethod - def quote(cls, identifier): + def quote(cls, identifier: str) -> str: return '`{}`'.format(identifier) @classmethod - def convert_text_type(cls, agate_table, col_idx): + def convert_text_type(cls, agate_table: agate.Table, col_idx: int) -> str: return "string" @classmethod - def convert_number_type(cls, agate_table, col_idx): + def convert_number_type( + cls, agate_table: agate.Table, col_idx: int + ) -> str: decimals = agate_table.aggregate(agate.MaxPrecision(col_idx)) return "float64" if decimals else "int64" @classmethod - def convert_boolean_type(cls, agate_table, col_idx): + def convert_boolean_type( + cls, agate_table: agate.Table, col_idx: int + ) -> str: return "bool" @classmethod - def convert_datetime_type(cls, agate_table, col_idx): + def convert_datetime_type( + cls, agate_table: agate.Table, col_idx: int + ) -> str: return "datetime" @classmethod - def convert_date_type(cls, agate_table, col_idx): + def convert_date_type(cls, agate_table: agate.Table, col_idx: int) -> str: return "date" @classmethod - def convert_time_type(cls, agate_table, col_idx): + def convert_time_type(cls, agate_table: agate.Table, col_idx: int) -> str: return "time" ### # Implementation details ### - def _get_dbt_columns_from_bq_table(self, table): + def _make_match_kwargs( + self, database: str, schema: str, identifier: str + ) -> Dict[str, str]: + return filter_null_values({ + 'database': database, + 'identifier': identifier, + 'schema': schema, + }) + + def _get_dbt_columns_from_bq_table(self, table) -> List[BigQueryColumn]: "Translates BQ SchemaField dicts into dbt BigQueryColumn objects" columns = [] @@ -231,7 +261,9 @@ def _get_dbt_columns_from_bq_table(self, table): return columns - def _agate_to_schema(self, agate_table, column_override): + def _agate_to_schema( + self, agate_table: agate.Table, column_override: Dict[str, str] + ) -> List[google.cloud.bigquery.SchemaField]: """Convert agate.Table with column names to a list of bigquery schemas. """ bq_schema = [] @@ -243,7 +275,7 @@ def _agate_to_schema(self, agate_table, column_override): ) return bq_schema - def _materialize_as_view(self, model): + def _materialize_as_view(self, model: Dict[str, Any]) -> str: model_database = model.get('database') model_schema = model.get('schema') model_alias = model.get('alias') @@ -258,7 +290,12 @@ def _materialize_as_view(self, model): ) return "CREATE VIEW" - def _materialize_as_table(self, model, model_sql, decorator=None): + def _materialize_as_table( + self, + model: Dict[str, Any], + model_sql: str, + decorator: Optional[str] = None, + ) -> str: model_database = model.get('database') model_schema = model.get('schema') model_alias = model.get('alias') diff --git a/test/integration/022_bigquery_test/case-sensitive-models/model.sql b/test/integration/022_bigquery_test/case-sensitive-models/fUnKyCaSe.sql similarity index 100% rename from test/integration/022_bigquery_test/case-sensitive-models/model.sql rename to test/integration/022_bigquery_test/case-sensitive-models/fUnKyCaSe.sql diff --git a/test/integration/022_bigquery_test/case-sensitive-schemas/model.sql b/test/integration/022_bigquery_test/case-sensitive-schemas/model.sql new file mode 100644 index 00000000000..1934d4cfc0c --- /dev/null +++ b/test/integration/022_bigquery_test/case-sensitive-schemas/model.sql @@ -0,0 +1,5 @@ +{{ config(materialized='incremental') }} +select 1 as id +{% if is_incremental() %} +this is a syntax error! +{% endif %} diff --git a/test/integration/022_bigquery_test/models/fUnKyCaSe.sql b/test/integration/022_bigquery_test/models/fUnKyCaSe.sql new file mode 100644 index 00000000000..43258a71464 --- /dev/null +++ b/test/integration/022_bigquery_test/models/fUnKyCaSe.sql @@ -0,0 +1 @@ +select 1 as id diff --git a/test/integration/022_bigquery_test/models/schema.yml b/test/integration/022_bigquery_test/models/schema.yml index e4095b04cfd..031753900d4 100644 --- a/test/integration/022_bigquery_test/models/schema.yml +++ b/test/integration/022_bigquery_test/models/schema.yml @@ -39,3 +39,13 @@ models: - was_materialized: name: table_model type: table +- name: fUnKyCaSe + columns: + - name: id + tests: + - not_null + - unique + tests: + - was_materialized: + name: fUnKyCaSe + type: view diff --git a/test/integration/022_bigquery_test/test_bigquery_case_sensitive_dataset.py b/test/integration/022_bigquery_test/test_bigquery_case_sensitive.py similarity index 60% rename from test/integration/022_bigquery_test/test_bigquery_case_sensitive_dataset.py rename to test/integration/022_bigquery_test/test_bigquery_case_sensitive.py index 46d3933f11b..cecd85c7238 100644 --- a/test/integration/022_bigquery_test/test_bigquery_case_sensitive_dataset.py +++ b/test/integration/022_bigquery_test/test_bigquery_case_sensitive.py @@ -1,8 +1,24 @@ from test.integration.base import DBTIntegrationTest, use_profile -class TestCaseSensitiveSchemaBigQueryRun(DBTIntegrationTest): +class TestCaseSensitiveModelBigQueryRun(DBTIntegrationTest): + @property + def schema(self): + return "bigquery_test_022" + + @property + def models(self): + return "case-sensitive-models" + + @use_profile('bigquery') + def test__bigquery_double_run_fails(self): + results = self.run_dbt() + self.assertEqual(len(results), 1) + self.run_dbt(expect_pass=False) + +class TestCaseSensitiveSchemaBigQueryRun(TestCaseSensitiveModelBigQueryRun): + # same test, but the schema is funky instead of the model name @property def schema(self): return "BigQuerY_test_022" @@ -15,10 +31,4 @@ def unique_schema(self): @property def models(self): - return "case-sensitive-models" - - @use_profile('bigquery') - def test__bigquery_double_run_fails(self): - results = self.run_dbt() - self.assertEqual(len(results), 1) - self.run_dbt(expect_pass=False) + return "case-sensitive-schemas" diff --git a/test/integration/022_bigquery_test/test_simple_bigquery_view.py b/test/integration/022_bigquery_test/test_simple_bigquery_view.py index b492e1b37f0..58072bbd214 100644 --- a/test/integration/022_bigquery_test/test_simple_bigquery_view.py +++ b/test/integration/022_bigquery_test/test_simple_bigquery_view.py @@ -49,7 +49,7 @@ def test__bigquery_simple_run(self): self.run_dbt(['seed']) self.run_dbt(['seed', '--full-refresh']) results = self.run_dbt() - self.assertEqual(len(results), 5) + self.assertEqual(len(results), 6) self.assert_nondupes_pass() @@ -60,7 +60,7 @@ class TestUnderscoreBigQueryRun(TestBaseBigQueryRun): def test_bigquery_run_twice(self): self.run_dbt(['seed']) results = self.run_dbt() - self.assertEqual(len(results), 5) + self.assertEqual(len(results), 6) results = self.run_dbt() - self.assertEqual(len(results), 5) + self.assertEqual(len(results), 6) self.assert_nondupes_pass() From 670c26bdbda85eee35769ec89330f4b1f1029bea Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 4 Nov 2019 11:02:30 -0700 Subject: [PATCH 5/5] use exists_ok, not_found_ok, and delete_contents on bigquery dataset operations --- .../dbt/adapters/bigquery/connections.py | 22 ++++--------------- plugins/bigquery/setup.py | 2 +- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index e2fc2f4f876..f9e89cb0ce0 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -296,28 +296,14 @@ def drop_dataset(self, database, schema): client = conn.handle with self.exception_handler('drop dataset'): - try: - tables = list(client.list_tables(dataset)) - except google.api_core.exceptions.NotFound: - # the dataset doesn't exist. return here to match - # 'drop schema if exists' behavior. If anything 404s after this - # then there are real problems that should cause us to raise. - return - for table in tables: - client.delete_table(table.reference) - client.delete_dataset(dataset) + client.delete_dataset( + dataset, delete_contents=True, not_found_ok=True + ) def create_dataset(self, database, schema): conn = self.get_thread_connection() client = conn.handle dataset = self.dataset(database, schema, conn) - # Emulate 'create schema if not exists ...' - try: - client.get_dataset(dataset) - return - except google.api_core.exceptions.NotFound: - pass - with self.exception_handler('create dataset'): - client.create_dataset(dataset) + client.create_dataset(dataset, exists_ok=True) diff --git a/plugins/bigquery/setup.py b/plugins/bigquery/setup.py index 83ccb3550cf..d58743529d2 100644 --- a/plugins/bigquery/setup.py +++ b/plugins/bigquery/setup.py @@ -30,7 +30,7 @@ }, install_requires=[ 'dbt-core=={}'.format(package_version), - 'google-cloud-bigquery>=1.0.0,<2', + 'google-cloud-bigquery>=1.15.0,<2', ], zip_safe=False, )