From 367eff77fcfd0c4facb213cfabdc416de2d3e90b Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 12 Mar 2020 15:56:43 -0400 Subject: [PATCH 1/3] (#2136) Fix missing sql_header for incremental models --- .../macros/materializations/common/merge.sql | 6 ++++++ .../snowflake/macros/materializations/merge.sql | 3 +++ .../models/sql_header_model_incr.sql | 15 +++++++++++++++ .../test_simple_bigquery_view.py | 6 +++--- 4 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 test/integration/022_bigquery_test/models/sql_header_model_incr.sql diff --git a/core/dbt/include/global_project/macros/materializations/common/merge.sql b/core/dbt/include/global_project/macros/materializations/common/merge.sql index dcbcc1a356d..8098483207e 100644 --- a/core/dbt/include/global_project/macros/materializations/common/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/common/merge.sql @@ -18,6 +18,7 @@ {% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set sql_header = config.get('sql_header', none) -%} {% if unique_key %} {% set unique_key_match %} @@ -28,6 +29,8 @@ {% do predicates.append('FALSE') %} {% endif %} + {{ sql_header if sql_header is not none }} + merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE on {{ predicates | join(' and ') }} @@ -87,6 +90,9 @@ {% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none }} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql index e3a5d5cd085..0c48eb9493a 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql @@ -7,9 +7,12 @@ #} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute='name')) -%} + {%- set sql_header = config.get('sql_header', none) -%} {%- if unique_key is none -%} + {{ sql_header if sql_header is not none }} + insert into {{ target }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} diff --git a/test/integration/022_bigquery_test/models/sql_header_model_incr.sql b/test/integration/022_bigquery_test/models/sql_header_model_incr.sql new file mode 100644 index 00000000000..f93280a3bfd --- /dev/null +++ b/test/integration/022_bigquery_test/models/sql_header_model_incr.sql @@ -0,0 +1,15 @@ + +{{ config(materialized="incremental") }} + +{# This will fail if it is not extracted correctly #} +{% call set_sql_header(config) %} + CREATE TEMPORARY FUNCTION a_to_b(str STRING) + RETURNS STRING AS ( + CASE + WHEN LOWER(str) = 'a' THEN 'b' + ELSE str + END + ); +{% endcall %} + +select a_to_b(dupe) as dupe from {{ ref('view_model') }} diff --git a/test/integration/022_bigquery_test/test_simple_bigquery_view.py b/test/integration/022_bigquery_test/test_simple_bigquery_view.py index e25704ba1b4..f17ca9bcfcc 100644 --- a/test/integration/022_bigquery_test/test_simple_bigquery_view.py +++ b/test/integration/022_bigquery_test/test_simple_bigquery_view.py @@ -53,7 +53,7 @@ def test__bigquery_simple_run(self): self.run_dbt(['seed', '--full-refresh']) results = self.run_dbt() # Bump expected number of results when adding new model - self.assertEqual(len(results), 8) + self.assertEqual(len(results), 9) self.assert_nondupes_pass() @@ -64,7 +64,7 @@ class TestUnderscoreBigQueryRun(TestBaseBigQueryRun): def test_bigquery_run_twice(self): self.run_dbt(['seed']) results = self.run_dbt() - self.assertEqual(len(results), 8) + self.assertEqual(len(results), 9) results = self.run_dbt() - self.assertEqual(len(results), 8) + self.assertEqual(len(results), 9) self.assert_nondupes_pass() From 3a5eb4c2ade24e473f7f57804f83bd73a6a00acf Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 6 Apr 2020 17:58:00 -0400 Subject: [PATCH 2/3] Fix for bq insert_overwrite incrementals, add additional tests --- .../macros/materializations/common/merge.sql | 8 ++--- .../macros/materializations/incremental.sql | 8 +++-- ...sql_header_model_incr_insert_overwrite.sql | 31 ++++++++++++++++++ ...der_model_incr_insert_overwrite_static.sql | 32 +++++++++++++++++++ .../test_simple_bigquery_view.py | 6 ++-- 5 files changed, 76 insertions(+), 9 deletions(-) create mode 100644 test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite.sql create mode 100644 test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite_static.sql diff --git a/core/dbt/include/global_project/macros/materializations/common/merge.sql b/core/dbt/include/global_project/macros/materializations/common/merge.sql index 8098483207e..778fdf8ac72 100644 --- a/core/dbt/include/global_project/macros/materializations/common/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/common/merge.sql @@ -10,8 +10,8 @@ {%- endmacro %} -{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} - {{ adapter_macro('get_insert_overwrite_merge_sql', target, source, dest_columns, predicates) }} +{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%} + {{ adapter_macro('get_insert_overwrite_merge_sql', target, source, dest_columns, predicates, include_sql_header) }} {%- endmacro %} @@ -87,12 +87,12 @@ {% endmacro %} -{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} +{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set sql_header = config.get('sql_header', none) -%} - {{ sql_header if sql_header is not none }} + {{ sql_header if sql_header is not none and include_sql_header }} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index 18a0c0bc350..6aabf8d7521 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -34,7 +34,7 @@ ) {%- endset -%} - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }} + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }} {% else %} {# dynamic #} @@ -66,8 +66,12 @@ from {{ tmp_relation }} ); + {# + TODO: include_sql_header is a hack; consider a better approach that includes + the sql_header at the materialization-level instead + #} -- 3. run the merge statement - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }}; -- 4. clean up the temp table drop table if exists {{ tmp_relation }} diff --git a/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite.sql b/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite.sql new file mode 100644 index 00000000000..467a0d8d7b6 --- /dev/null +++ b/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite.sql @@ -0,0 +1,31 @@ + +{# + Ensure that the insert overwrite incremental strategy + works correctly when a UDF is used in a sql_header. The + failure mode here is that dbt might inject the UDF header + twice: once for the `create table` and then again for the + merge statement. +#} + +{{ config( + materialized="incremental", + incremental_strategy='insert_overwrite', + partition_by={"field": "dt", "data_type": "date"} +) }} + +{# This will fail if it is not extracted correctly #} +{% call set_sql_header(config) %} + CREATE TEMPORARY FUNCTION a_to_b(str STRING) + RETURNS STRING AS ( + CASE + WHEN LOWER(str) = 'a' THEN 'b' + ELSE str + END + ); +{% endcall %} + +select + current_date() as dt, + a_to_b(dupe) as dupe + +from {{ ref('view_model') }} diff --git a/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite_static.sql b/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite_static.sql new file mode 100644 index 00000000000..4d760a0fd96 --- /dev/null +++ b/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite_static.sql @@ -0,0 +1,32 @@ + +{# + Ensure that the insert overwrite incremental strategy + works correctly when a UDF is used in a sql_header. The + failure mode here is that dbt might inject the UDF header + twice: once for the `create table` and then again for the + merge statement. +#} + +{{ config( + materialized="incremental", + incremental_strategy='insert_overwrite', + partition_by={"field": "dt", "data_type": "date"}, + partitions=["'2020-01-1'"] +) }} + +{# This will fail if it is not extracted correctly #} +{% call set_sql_header(config) %} + CREATE TEMPORARY FUNCTION a_to_b(str STRING) + RETURNS STRING AS ( + CASE + WHEN LOWER(str) = 'a' THEN 'b' + ELSE str + END + ); +{% endcall %} + +select + cast('2020-01-01' as date) as dt, + a_to_b(dupe) as dupe + +from {{ ref('view_model') }} diff --git a/test/integration/022_bigquery_test/test_simple_bigquery_view.py b/test/integration/022_bigquery_test/test_simple_bigquery_view.py index f17ca9bcfcc..11cc58bb67a 100644 --- a/test/integration/022_bigquery_test/test_simple_bigquery_view.py +++ b/test/integration/022_bigquery_test/test_simple_bigquery_view.py @@ -53,7 +53,7 @@ def test__bigquery_simple_run(self): self.run_dbt(['seed', '--full-refresh']) results = self.run_dbt() # Bump expected number of results when adding new model - self.assertEqual(len(results), 9) + self.assertEqual(len(results), 11) self.assert_nondupes_pass() @@ -64,7 +64,7 @@ class TestUnderscoreBigQueryRun(TestBaseBigQueryRun): def test_bigquery_run_twice(self): self.run_dbt(['seed']) results = self.run_dbt() - self.assertEqual(len(results), 9) + self.assertEqual(len(results), 11) results = self.run_dbt() - self.assertEqual(len(results), 9) + self.assertEqual(len(results), 11) self.assert_nondupes_pass() From 579a6d6064e1796cad0db2071c3eb9bcaceffdee Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 6 Apr 2020 20:35:48 -0400 Subject: [PATCH 3/3] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 989b8961221..895852d8b2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - When a macro is called with invalid arguments, include the calling model in the output ([#2073](https://github.com/fishtown-analytics/dbt/issues/2073), [#2238](https://github.com/fishtown-analytics/dbt/pull/2238)) - When a warn exception is not in a jinja do block, return an empty string instead of None ([#2222](https://github.com/fishtown-analytics/dbt/issues/2222), [#2259](https://github.com/fishtown-analytics/dbt/pull/2259)) - Add dbt plugin versions to --version([#2272](https://github.com/fishtown-analytics/dbt/issues/2272), [#2279](https://github.com/fishtown-analytics/dbt/pull/2279)) +- Add support for `sql_header` config in incremental models ([#2136](https://github.com/fishtown-analytics/dbt/issues/2136), [#2200](https://github.com/fishtown-analytics/dbt/pull/2200)) Contributors: - [@raalsky](https://github.com/Raalsky) ([#2224](https://github.com/fishtown-analytics/dbt/pull/2224), [#2228](https://github.com/fishtown-analytics/dbt/pull/2228))