Skip to content

Commit

Permalink
Fix normalization when un-nesting (#8378)
Browse files Browse the repository at this point in the history
* Remove unique key on exploded nested tables

* un-nest hint

* Regen SQL
  • Loading branch information
ChristopheDuong authored Dec 1, 2021
1 parent 15fe4dd commit c4c92bd
Show file tree
Hide file tree
Showing 55 changed files with 50 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
{% macro sqlserver__cross_join_unnest(stream_name, array_col) -%}
{# https://docs.microsoft.com/en-us/sql/relational-databases/json/convert-json-data-to-rows-and-columns-with-openjson-sql-server?view=sql-server-ver15#option-1---openjson-with-the-default-output #}
CROSS APPLY (
SELECT [value] = CASE
WHEN [type] = 4 THEN (SELECT [value] FROM OPENJSON([value]))
SELECT [value] = CASE
WHEN [type] = 4 THEN (SELECT [value] FROM OPENJSON([value]))
WHEN [type] = 5 THEN [value]
END
FROM OPENJSON({{ array_col }})
Expand Down Expand Up @@ -93,21 +93,21 @@

{# unnest_cte ------------------------------------------------- #}

{% macro unnest_cte(table_name, stream_name, column_col) -%}
{{ adapter.dispatch('unnest_cte')(table_name, stream_name, column_col) }}
{% macro unnest_cte(from_table, stream_name, column_col) -%}
{{ adapter.dispatch('unnest_cte')(from_table, stream_name, column_col) }}
{%- endmacro %}

{% macro default__unnest_cte(table_name, stream_name, column_col) -%}{%- endmacro %}
{% macro default__unnest_cte(from_table, stream_name, column_col) -%}{%- endmacro %}

{# -- based on https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/ #}
{% macro redshift__unnest_cte(table_name, stream_name, column_col) -%}
{% macro redshift__unnest_cte(from_table, stream_name, column_col) -%}
{%- if not execute -%}
{{ return('') }}
{% endif %}
{%- call statement('max_json_array_length', fetch_result=True) -%}
with max_value as (
select max(json_array_length({{ column_col }}, true)) as max_number_of_items
from {{ ref(table_name) }}
from {{ from_table }}
)
select
case when max_number_of_items is not null and max_number_of_items > 1
Expand All @@ -123,23 +123,23 @@ joined as (
select
_airbyte_{{ stream_name }}_hashid as _airbyte_hashid,
json_extract_array_element_text({{ column_col }}, numbers.generated_number::int - 1, true) as _airbyte_nested_data
from {{ ref(table_name) }}
from {{ from_table }}
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in {{ table_name }}.{{ column_col }}
-- to the number of items in {{ from_table }}.{{ column_col }}
where numbers.generated_number <= json_array_length({{ column_col }}, true)
)
{%- endmacro %}

{% macro mysql__unnest_cte(table_name, stream_name, column_col) -%}
{% macro mysql__unnest_cte(from_table, stream_name, column_col) -%}
{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- call statement('max_json_array_length', fetch_result=True) -%}
with max_value as (
select max(json_length({{ column_col }})) as max_number_of_items
from {{ ref(table_name) }}
from {{ from_table }}
)
select
case when max_number_of_items is not null and max_number_of_items > 1
Expand All @@ -157,10 +157,10 @@ joined as (
_airbyte_{{ stream_name }}_hashid as _airbyte_hashid,
{# -- json_extract(column_col, '$[i][0]') as _airbyte_nested_data #}
json_extract({{ column_col }}, concat("$[", numbers.generated_number - 1, "][0]")) as _airbyte_nested_data
from {{ ref(table_name) }}
from {{ from_table }}
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in {{ table_name }}.{{ column_col }}
-- to the number of items in {{ from_table }}.{{ column_col }}
where numbers.generated_number <= json_length({{ column_col }})
)
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition') }}
{{ unnest_cte('nested_stream_with_complex_columns_resulting_into_long_names_partition', 'partition', 'DATA') }}
{{ unnest_cte(ref('nested_stream_with_complex_columns_resulting_into_long_names_partition'), 'partition', 'DATA') }}
select
_airbyte_partition_hashid,
{{ json_extract_scalar(unnested_column_value('DATA'), ['currency'], ['currency']) }} as currency,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{{ config(
cluster_by = "_airbyte_emitted_at",
partition_by = {"field": "_airbyte_emitted_at", "data_type": "timestamp", "granularity": "day"},
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partition') }}
{{ unnest_cte('nested_stream_with_complex_columns_resulting_into_long_names_partition', 'partition', 'double_array_data') }}
{{ unnest_cte(ref('nested_stream_with_complex_columns_resulting_into_long_names_partition'), 'partition', 'double_array_data') }}
select
_airbyte_partition_hashid,
{{ json_extract_scalar(unnested_column_value('double_array_data'), ['id'], ['id']) }} as id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{{ config(
cluster_by = "_airbyte_emitted_at",
partition_by = {"field": "_airbyte_emitted_at", "data_type": "timestamp", "granularity": "day"},
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
tags = [ "nested" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@






Expand All @@ -9,14 +8,9 @@
using (
select * from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on
DBT_INTERNAL_SOURCE._airbyte_ab_id = DBT_INTERNAL_DEST._airbyte_ab_id

on FALSE


when matched then update set
`_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid` = DBT_INTERNAL_SOURCE.`_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid`,`double_array_data` = DBT_INTERNAL_SOURCE.`double_array_data`,`DATA` = DBT_INTERNAL_SOURCE.`DATA`,`_airbyte_ab_id` = DBT_INTERNAL_SOURCE.`_airbyte_ab_id`,`_airbyte_emitted_at` = DBT_INTERNAL_SOURCE.`_airbyte_emitted_at`,`_airbyte_normalized_at` = DBT_INTERNAL_SOURCE.`_airbyte_normalized_at`,`_airbyte_partition_hashid` = DBT_INTERNAL_SOURCE.`_airbyte_partition_hashid`


when not matched then insert
(`_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid`, `double_array_data`, `DATA`, `_airbyte_ab_id`, `_airbyte_emitted_at`, `_airbyte_normalized_at`, `_airbyte_partition_hashid`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ from "test_normalization".test_normalization."nested_stream_with_co___long_names
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA
CROSS APPLY (
SELECT [value] = CASE
WHEN [type] = 4 THEN (SELECT [value] FROM OPENJSON([value]))
SELECT [value] = CASE
WHEN [type] = 4 THEN (SELECT [value] FROM OPENJSON([value]))
WHEN [type] = 5 THEN [value]
END
FROM OPENJSON("DATA")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ from "test_normalization".test_normalization."nested_stream_with_co___long_names
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data
CROSS APPLY (
SELECT [value] = CASE
WHEN [type] = 4 THEN (SELECT [value] FROM OPENJSON([value]))
SELECT [value] = CASE
WHEN [type] = 4 THEN (SELECT [value] FROM OPENJSON([value]))
WHEN [type] = 5 THEN [value]
END
FROM OPENJSON(double_array_data)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{{ config(
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('nested_stream_with_co___long_names_partition') }}
{{ unnest_cte('nested_stream_with_co___long_names_partition', 'partition', adapter.quote('DATA')) }}
{{ unnest_cte(ref('nested_stream_with_co___long_names_partition'), 'partition', adapter.quote('DATA')) }}
select
_airbyte_partition_hashid,
{{ json_extract_scalar(unnested_column_value(adapter.quote('DATA')), ['currency'], ['currency']) }} as currency,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('nested_stream_with_co___long_names_partition') }}
{{ unnest_cte('nested_stream_with_co___long_names_partition', 'partition', 'double_array_data') }}
{{ unnest_cte(ref('nested_stream_with_co___long_names_partition'), 'partition', 'double_array_data') }}
select
_airbyte_partition_hashid,
{{ json_extract_scalar(unnested_column_value('double_array_data'), ['id'], ['id']) }} as id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{{ config(
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
tags = [ "nested" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@

delete
from "test_normalization".test_normalization."nested_stream_with_co___long_names_partition"
where (_airbyte_ab_id) in (
select (_airbyte_ab_id)
from "test_normalization".test_normalization."#nested_stream_with_co___long_names_partition__dbt_tmp"
);


insert into "test_normalization".test_normalization."nested_stream_with_co___long_names_partition" ("_airbyte_nested_strea__nto_long_names_hashid", "double_array_data", "DATA", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_partition_hashid")
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ with numbers as (
from test_normalization.`nested_stream_with_co___long_names_partition`
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in nested_stream_with_co___long_names_partition.`DATA`
-- to the number of items in test_normalization.`nested_stream_with_co___long_names_partition`.`DATA`
where numbers.generated_number <= json_length(`DATA`)
)
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ with numbers as (
from test_normalization.`nested_stream_with_co___long_names_partition`
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in nested_stream_with_co___long_names_partition.double_array_data
-- to the number of items in test_normalization.`nested_stream_with_co___long_names_partition`.double_array_data
where numbers.generated_number <= json_length(double_array_data)
)
select
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{{ config(
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('nested_stream_with_co___long_names_partition') }}
{{ unnest_cte('nested_stream_with_co___long_names_partition', 'partition', 'double_array_data') }}
{{ unnest_cte(ref('nested_stream_with_co___long_names_partition'), 'partition', 'double_array_data') }}
select
_airbyte_partition_hashid,
{{ json_extract_scalar(unnested_column_value('double_array_data'), ['id'], ['id']) }} as id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('nested_stream_with_co___long_names_partition') }}
{{ unnest_cte('nested_stream_with_co___long_names_partition', 'partition', adapter.quote('DATA')) }}
{{ unnest_cte(ref('nested_stream_with_co___long_names_partition'), 'partition', adapter.quote('DATA')) }}
select
_airbyte_partition_hashid,
{{ json_extract_scalar(unnested_column_value(adapter.quote('DATA')), ['currency'], ['currency']) }} as currency,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{{ config(
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
tags = [ "nested" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ with numbers as (
from test_normalization.`nested_stream_with_co___long_names_partition`
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in nested_stream_with_co___long_names_partition.`DATA`
-- to the number of items in test_normalization.`nested_stream_with_co___long_names_partition`.`DATA`
where numbers.generated_number <= json_length(`DATA`)
)
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ with numbers as (
from test_normalization.`nested_stream_with_co___long_names_partition`
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in nested_stream_with_co___long_names_partition.double_array_data
-- to the number of items in test_normalization.`nested_stream_with_co___long_names_partition`.double_array_data
where numbers.generated_number <= json_length(double_array_data)
)
select
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('nested_stream_with_c___long_names_partition') }}
{{ unnest_cte('nested_stream_with_c___long_names_partition', 'partition', adapter.quote('DATA')) }}
{{ unnest_cte(ref('nested_stream_with_c___long_names_partition'), 'partition', adapter.quote('DATA')) }}
select
_airbyte_partition_hashid,
{{ json_extract_scalar(unnested_column_value(adapter.quote('DATA')), ['currency'], ['currency']) }} as currency,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('nested_stream_with_c___long_names_partition') }}
{{ unnest_cte('nested_stream_with_c___long_names_partition', 'partition', 'double_array_data') }}
{{ unnest_cte(ref('nested_stream_with_c___long_names_partition'), 'partition', 'double_array_data') }}
select
_airbyte_partition_hashid,
{{ json_extract_scalar(unnested_column_value('double_array_data'), ['id'], ['id']) }} as {{ adapter.quote('id') }},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('unnest_alias_children_owner') }}
{{ unnest_cte('unnest_alias_children_owner', 'owner', adapter.quote('column`_\'with""_quotes')) }}
{{ unnest_cte(ref('unnest_alias_children_owner'), 'owner', adapter.quote('column`_\'with""_quotes')) }}
select
_airbyte_owner_hashid,
{{ json_extract_scalar(unnested_column_value(adapter.quote('column`_\'with""_quotes')), ['currency'], ['currency']) }} as currency,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ ref('unnest_alias') }}
{{ unnest_cte('unnest_alias', 'unnest_alias', 'children') }}
{{ unnest_cte(ref('unnest_alias'), 'unnest_alias', 'children') }}
select
_airbyte_unnest_alias_hashid,
{{ json_extract_scalar(unnested_column_value('children'), ['ab_id'], ['ab_id']) }} as ab_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "nested-intermediate" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
tags = [ "nested" ]
) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'hash'}],
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
tags = [ "nested" ]
) }}
Expand Down
Loading

0 comments on commit c4c92bd

Please sign in to comment.