Skip to content

Commit

Permalink
Normalization: Revert to protocol v0 (#22283)
Browse files Browse the repository at this point in the history
* Revert "Normalization: handle non-object top-level schemas; treat binary data as string (#22165)"

This reverts commit 8276d03.

* Revert "Normalization: check for ref type existence (#22161)"

This reverts commit dbe56d6.

* Revert "🎉Updated normalization to handle new datatypes (#19721)"

This reverts commit c1d7736.

* revert dest definitions

* also dockerfile

* re-add to changelog

* add comment in dockerfile
  • Loading branch information
edgao authored Feb 6, 2023
1 parent 3e010bd commit 517fc6a
Show file tree
Hide file tree
Showing 237 changed files with 1,051 additions and 4,195 deletions.
4 changes: 3 additions & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.3.2
# 0.3.x is tombstoned.
# The next minor bump should go directly to 0.4.0
LABEL io.airbyte.version=0.2.25
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@
string
{% endmacro %}

{%- macro type_binary() -%}
{{ adapter.dispatch('type_binary')() }}
{%- endmacro -%}

{%- macro default__type_binary() -%}
binary
{%- endmacro -%}

{%- macro redshift__type_json() -%}
{%- if redshift_super_type() -%}
super
Expand Down Expand Up @@ -80,28 +72,6 @@
char(1000)
{%- endmacro -%}

{# binary data ------------------------------------------------- #}

{%- macro postgres__type_binary() -%}
bytea
{%- endmacro -%}

{%- macro bigquery__type_binary() -%}
bytes
{%- endmacro -%}

{%- macro mssql__type_binary() -%}
VARBINARY(MAX)
{%- endmacro -%}

{%- macro snowflake__type_binary() -%}
VARBINARY
{%- endmacro -%}

{%- macro clickhouse__type_binary() -%}
VARBINARY
{%- endmacro -%}

{# float ------------------------------------------------- #}
{% macro mysql__type_float() %}
float
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
from (
select distinct _airbyte_unique_key as unique_key
from {{ this }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
) recent_records
left join (
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
from {{ this }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
group by _airbyte_unique_key
) active_counts
on recent_records.unique_key = active_counts.unique_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ select
json_extract_scalar(_airbyte_data, "$['datetime_no_tz']") as datetime_no_tz,
json_extract_scalar(_airbyte_data, "$['time_tz']") as time_tz,
json_extract_scalar(_airbyte_data, "$['time_no_tz']") as time_no_tz,
json_extract_scalar(_airbyte_data, "$['property_binary_data']") as property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at
Expand Down Expand Up @@ -75,7 +74,6 @@ select
cast(nullif(time_no_tz, '') as
time
) as time_no_tz,
cast(FROM_BASE64(property_binary_data) as bytes) as property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at
Expand Down Expand Up @@ -113,8 +111,6 @@ select
string
), ''), '-', coalesce(cast(time_no_tz as
string
), ''), '-', coalesce(cast(property_binary_data as
string
), '')) as
string
))) as _airbyte_exchange_rate_hashid,
Expand All @@ -138,7 +134,6 @@ select
datetime_no_tz,
time_tz,
time_no_tz,
property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
from (
select distinct _airbyte_unique_key as unique_key
from {{ this }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
) recent_records
left join (
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
from {{ this }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
group by _airbyte_unique_key
) active_counts
on recent_records.unique_key = active_counts.unique_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,6 @@
partition_by = {"field": "_airbyte_emitted_at", "data_type": "timestamp", "granularity": "day"},
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
post_hook = ["
{%
set scd_table_relation = adapter.get_relation(
database=this.database,
schema=this.schema,
identifier='exchange_rate_scd'
)
%}
{%
if scd_table_relation is not none
%}
{%
do adapter.drop_relation(scd_table_relation)
%}
{% endif %}
"],
tags = [ "top-level" ]
) }}
-- Final base SQL model
Expand All @@ -37,7 +21,6 @@ select
datetime_no_tz,
time_tz,
time_no_tz,
property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
from (
select distinct _airbyte_unique_key as unique_key
from {{ this }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
) recent_records
left join (
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
from {{ this }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
group by _airbyte_unique_key
) active_counts
on recent_records.unique_key = active_counts.unique_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,6 @@
partition_by = {"field": "_airbyte_emitted_at", "data_type": "timestamp", "granularity": "day"},
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
post_hook = ["
{%
set scd_table_relation = adapter.get_relation(
database=this.database,
schema=this.schema,
identifier='exchange_rate_scd'
)
%}
{%
if scd_table_relation is not none
%}
{%
do adapter.drop_relation(scd_table_relation)
%}
{% endif %}
"],
tags = [ "top-level" ]
) }}
-- Final base SQL model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ select
json_extract_scalar(_airbyte_data, "$['datetime_no_tz']") as datetime_no_tz,
json_extract_scalar(_airbyte_data, "$['time_tz']") as time_tz,
json_extract_scalar(_airbyte_data, "$['time_no_tz']") as time_no_tz,
json_extract_scalar(_airbyte_data, "$['property_binary_data']") as property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at
Expand Down Expand Up @@ -75,7 +74,6 @@ select
cast(nullif(time_no_tz, '') as
time
) as time_no_tz,
cast(FROM_BASE64(property_binary_data) as bytes) as property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at
Expand Down Expand Up @@ -113,8 +111,6 @@ select
string
), ''), '-', coalesce(cast(time_no_tz as
string
), ''), '-', coalesce(cast(property_binary_data as
string
), '')) as
string
))) as _airbyte_exchange_rate_hashid,
Expand All @@ -138,7 +134,6 @@ select
datetime_no_tz,
time_tz,
time_no_tz,
property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create view _airbyte_test_normalization.dedup_exchange_rate_ab1
create view _airbyte_test_normalization.dedup_exchange_rate_ab1__dbt_tmp

as (

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@


create view _airbyte_test_normalization.dedup_exchange_rate_ab2
create view _airbyte_test_normalization.dedup_exchange_rate_ab2__dbt_tmp

as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: _airbyte_test_normalization.dedup_exchange_rate_ab1
select
accurateCastOrNull(trim(BOTH '"' from id), '
accurateCastOrNull(id, '
BIGINT
') as id,
nullif(accurateCastOrNull(trim(BOTH '"' from currency), 'String'), 'null') as currency,
toDate(parseDateTimeBestEffortOrNull(trim(BOTH '"' from nullif(date, '')))) as date,
parseDateTime64BestEffortOrNull(trim(BOTH '"' from nullif(timestamp_col, ''))) as timestamp_col,
accurateCastOrNull(trim(BOTH '"' from "HKD@spéçiäl & characters"), '
accurateCastOrNull("HKD@spéçiäl & characters", '
Float64
') as "HKD@spéçiäl & characters",
nullif(accurateCastOrNull(trim(BOTH '"' from HKD_special___characters), 'String'), 'null') as HKD_special___characters,
accurateCastOrNull(trim(BOTH '"' from NZD), '
accurateCastOrNull(NZD, '
Float64
') as NZD,
accurateCastOrNull(trim(BOTH '"' from USD), '
accurateCastOrNull(USD, '
Float64
') as USD,
_airbyte_ab_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@







insert into test_normalization.dedup_cdc_excluded_scd ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "name", "_ab_cdc_lsn", "_ab_cdc_updated_at", "_ab_cdc_deleted_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_cdc_excluded_hashid")
Expand Down Expand Up @@ -103,4 +101,4 @@ select
_airbyte_dedup_cdc_excluded_hashid
from dedup_data where _airbyte_row_num = 1


Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@







insert into test_normalization.dedup_exchange_rate_scd ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "HKD_special___characters", "NZD", "USD", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
Expand Down Expand Up @@ -107,4 +105,4 @@ select
_airbyte_dedup_exchange_rate_hashid
from dedup_data where _airbyte_row_num = 1


Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@







insert into test_normalization.renamed_dedup_cdc_excluded_scd ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "_ab_cdc_updated_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_renamed_dedup_cdc_excluded_hashid")
Expand Down Expand Up @@ -89,4 +87,4 @@ select
_airbyte_renamed_dedup_cdc_excluded_hashid
from dedup_data where _airbyte_row_num = 1


Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@







insert into test_normalization.dedup_exchange_rate ("_airbyte_unique_key", "id", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "HKD_special___characters", "NZD", "USD", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
Expand All @@ -28,4 +26,4 @@ where 1 = 1
and _airbyte_active_row = 1



Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@







insert into test_normalization.renamed_dedup_cdc_excluded ("_airbyte_unique_key", "id", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_renamed_dedup_cdc_excluded_hashid")
Expand All @@ -22,4 +20,4 @@ where 1 = 1
and _airbyte_active_row = 1



Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@





insert into test_normalization.exchange_rate ("id", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "HKD_special___characters", "NZD", "USD", "column___with__quotes", "datetime_tz", "datetime_no_tz", "time_tz", "time_no_tz", "property_binary_data", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_exchange_rate_hashid")
insert into test_normalization.exchange_rate__dbt_tmp ("id", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "HKD_special___characters", "NZD", "USD", "column___with__quotes", "datetime_tz", "datetime_no_tz", "time_tz", "time_no_tz", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_exchange_rate_hashid")

-- Final base SQL model
-- depends_on: _airbyte_test_normalization.exchange_rate_ab3
Expand All @@ -21,7 +19,6 @@ select
datetime_no_tz,
time_tz,
time_no_tz,
property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create view _airbyte_test_normalization.dedup_exchange_rate_stg
create view _airbyte_test_normalization.dedup_exchange_rate_stg__dbt_tmp

as (

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create view _airbyte_test_normalization.multiple_column_names_conflicts_stg
create view _airbyte_test_normalization.multiple_column_names_conflicts_stg__dbt_tmp

as (

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: {{ ref('dedup_exchange_rate_ab1') }}
select
accurateCastOrNull(trim(BOTH '"' from id), '{{ dbt_utils.type_bigint() }}') as id,
accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id,
nullif(accurateCastOrNull(trim(BOTH '"' from currency), '{{ dbt_utils.type_string() }}'), 'null') as currency,
toDate(parseDateTimeBestEffortOrNull(trim(BOTH '"' from {{ empty_string_to_null('date') }}))) as date,
parseDateTime64BestEffortOrNull(trim(BOTH '"' from {{ empty_string_to_null('timestamp_col') }})) as timestamp_col,
accurateCastOrNull(trim(BOTH '"' from {{ quote('HKD@spéçiäl & characters') }}), '{{ dbt_utils.type_float() }}') as {{ quote('HKD@spéçiäl & characters') }},
accurateCastOrNull({{ quote('HKD@spéçiäl & characters') }}, '{{ dbt_utils.type_float() }}') as {{ quote('HKD@spéçiäl & characters') }},
nullif(accurateCastOrNull(trim(BOTH '"' from HKD_special___characters), '{{ dbt_utils.type_string() }}'), 'null') as HKD_special___characters,
accurateCastOrNull(trim(BOTH '"' from NZD), '{{ dbt_utils.type_float() }}') as NZD,
accurateCastOrNull(trim(BOTH '"' from USD), '{{ dbt_utils.type_float() }}') as USD,
accurateCastOrNull(NZD, '{{ dbt_utils.type_float() }}') as NZD,
accurateCastOrNull(USD, '{{ dbt_utils.type_float() }}') as USD,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
Expand Down
Loading

0 comments on commit 517fc6a

Please sign in to comment.