Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚨 Snowflake produces permanent tables 🚨 #9063

Merged
merged 38 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c6ae621
add normalization-clickhouse docker build step
jzcruiser Dec 22, 2021
b88225e
Merge branch 'patch-4' of github.com:jzcruiser/airbyte into marcos/te…
marcosmarxm Dec 23, 2021
cc499c2
bump normalization version
marcosmarxm Dec 23, 2021
a2517a3
small changes gradle
marcosmarxm Dec 23, 2021
4a51799
Merge branch 'master' into marcos/test-pr-9029
marcosmarxm Dec 23, 2021
a57495c
fix settings gradle
marcosmarxm Dec 27, 2021
c56ef54
fix eof file
marcosmarxm Dec 27, 2021
f8ccfd6
correct clickhouse normalization
marcosmarxm Dec 29, 2021
f24ea4b
Merge branch 'master' into marcos/test-pr-9029
edgao Jan 4, 2022
a6e4c31
Refactor jinja template for scd (#9278)
ChristopheDuong Jan 4, 2022
4f9f8ae
merge chris code and regenerate sql files
marcosmarxm Jan 4, 2022
621ae20
add snowflake as copy of standard
edgao Dec 22, 2021
d0ca72a
snowflake creates permanent tables
edgao Dec 22, 2021
3aef7c9
dockerfile respects updated dbt_project.yml
edgao Dec 22, 2021
05b0b0f
add to docker-compose
edgao Dec 22, 2021
5a9bacd
hook up custom normalization image
edgao Dec 22, 2021
f7c2d9b
add to test
edgao Dec 22, 2021
8c96d2d
more fixes?
edgao Dec 22, 2021
a0e7399
build.gradle; some sort of test?
edgao Dec 22, 2021
c0a755b
add to integration test
edgao Dec 23, 2021
4f92a66
case-sensitive patterns
edgao Dec 23, 2021
be09211
handle m1 error
edgao Dec 23, 2021
93c0b15
more snowflake-specific handling
edgao Dec 23, 2021
0bb000c
ran tests
edgao Dec 24, 2021
7e73552
docs + version bumps
edgao Dec 24, 2021
2f7dc29
inject :dev normalization version during test
edgao Dec 24, 2021
b31f5ec
try hardcoding :dev image
edgao Dec 27, 2021
405d038
add destination variable
edgao Dec 29, 2021
0c55399
regenerate test output
edgao Dec 29, 2021
b8db991
typo
edgao Dec 30, 2021
d8abb0c
exclude snowflake dbt template from spotless
edgao Dec 30, 2021
baf0db2
clarify documentation
edgao Jan 5, 2022
fe10d82
Merge branch 'master' into edgao/snowflake_permanent_tables
edgao Jan 5, 2022
01bcb67
regenerate normalization_test_output
edgao Jan 5, 2022
ab662e3
delete unused variable
edgao Jan 6, 2022
03e38dc
minor bump actually
edgao Jan 6, 2022
dffe792
bump definition
edgao Jan 6, 2022
663b498
Merge branch 'master' into edgao/snowflake_permanent_tables
edgao Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte-integrations/bases/base-normalization/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
!dbt-project-template-mssql
!dbt-project-template-mysql
!dbt-project-template-oracle
!dbt-project-template-clickhouse
!dbt-project-template-snowflake
11 changes: 11 additions & 0 deletions airbyte-integrations/bases/base-normalization/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,29 @@ integration_tests/normalization_test_output/**/*.yml
# Simple Streams
!integration_tests/normalization_test_output/**/dedup_exchange_rate*.sql
!integration_tests/normalization_test_output/**/exchange_rate.sql
!integration_tests/normalization_test_output/**/DEDUP_EXCHANGE_RATE*.sql
!integration_tests/normalization_test_output/**/EXCHANGE_RATE.sql
# Nested Streams
# Parent table
!integration_tests/normalization_test_output/**/nested_stream_with*_names_ab*.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_names_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_names.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH*_NAMES_AB*.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH*_NAMES_SCD.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH*_NAMES.sql
# Nested table
!integration_tests/normalization_test_output/**/nested_stream_with_*_partition_ab1.sql
!integration_tests/normalization_test_output/**/nested_stream_with_*_data_ab1.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_partition_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_data_scd.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_partition.sql
!integration_tests/normalization_test_output/**/nested_stream_with*_data.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH_*_PARTITION_AB1.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH_*_DATA_AB1.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH*_PARTITION_SCD.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH*_DATA_SCD.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH*_PARTITION.sql
!integration_tests/normalization_test_output/**/NESTED_STREAM_WITH*_DATA.sql

# but we keep all sql files for Postgres
!integration_tests/normalization_test_output/postgres/**/*.sql
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.61
LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.name=airbyte/normalization
11 changes: 11 additions & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,20 @@ task airbyteDockerOracle(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('oracle')
dependsOn assemble
}
task airbyteDockerClickhouse(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('clickhouse')
dependsOn assemble
}
task airbyteDockerSnowflake(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('snowflake')
dependsOn assemble
}

airbyteDocker.dependsOn(airbyteDockerMSSql)
airbyteDocker.dependsOn(airbyteDockerMySql)
airbyteDocker.dependsOn(airbyteDockerOracle)
airbyteDocker.dependsOn(airbyteDockerClickhouse)
airbyteDocker.dependsOn(airbyteDockerSnowflake)

task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs) {
module = "pytest"
Expand All @@ -86,6 +96,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-oracle:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mssql:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-clickhouse:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# This file is necessary to install dbt-utils with dbt deps
# the content will be overwritten by the transform function

# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "airbyte_utils"
version: "1.0"
config-version: 2

# This setting configures which "profile" dbt uses for this project. Profiles contain
# database connection information, and should be configured in the ~/.dbt/profiles.yml file
profile: "normalize"

# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
source-paths: ["models"]
docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
data-paths: ["data"]
macro-paths: ["macros"]

target-path: "../build" # directory which will store compiled SQL files
log-path: "../logs" # directory which will store DBT logs
modules-path: "/tmp/dbt_modules" # directory which will store external DBT dependencies

clean-targets: # directories to be removed by `dbt clean`
- "build"
- "dbt_modules"

quoting:
database: true
# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785)
# all schemas should be unquoted
schema: false
identifier: true

# You can define configurations for models in the `source-paths` directory here.
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
+transient: false
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_incremental:
+tags: incremental_tables
+materialized: incremental
+on_schema_change: sync_all_columns
airbyte_tables:
+tags: normalized_tables
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

dispatch:
- macro_namespace: dbt_utils
search_order: ["airbyte_utils", "dbt_utils"]

vars:
destination: "snowflake"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to supply env vars at runtime for dbt?

when normalization runs to generate the dbt project/files, we already know we are doing it for a certain destination, isn't that enough?

Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ services:
context: .
labels:
io.airbyte.git-revision: ${GIT_REVISION}
normalization-snowflake:
image: airbyte/normalization-snowflake:${VERSION}
build:
dockerfile: snowflake.Dockerfile
context: .
labels:
io.airbyte.git-revision: ${GIT_REVISION}
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ services:
image: airbyte/normalization-mysql:${VERSION}
normalization-oracle:
image: airbyte/normalization-oracle:${VERSION}
normalization-clickhouse:
image: airbyte/normalization-clickhouse:${VERSION}
normalization-snowflake:
image: airbyte/normalization-snowflake:${VERSION}
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ def get_normalization_image(destination_type: DestinationType) -> str:
return "airbyte/normalization-oracle:dev"
elif DestinationType.CLICKHOUSE.value == destination_type.value:
return "airbyte/normalization-clickhouse:dev"
elif DestinationType.SNOWFLAKE.value == destination_type.value:
return "airbyte/normalization-snowflake:dev"
else:
return "airbyte/normalization:dev"

Expand Down Expand Up @@ -445,6 +447,8 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc
"Configuration paths exist in your dbt_project.yml", # When no cte / view are generated
"Error loading config file: .dockercfg: $HOME is not defined", # ignore warning
"depends on a node named 'disabled_test' which was not found", # Tests throwing warning because it is disabled
"The requested image's platform (linux/amd64) does not match the detected host platform "
+ "(linux/arm64/v8) and no specific platform was requested", # temporary patch until we publish images for arm64
]:
if except_clause in str_line:
is_exception = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('dedup_cdc_excluded_stg')
with

input_data as (
select *
from _airbyte_test_normalization.dedup_cdc_excluded_ab3
from _airbyte_test_normalization.dedup_cdc_excluded_stg
-- dedup_cdc_excluded from test_normalization._airbyte_raw_dedup_cdc_excluded
),

Expand Down Expand Up @@ -45,15 +46,15 @@ scd_data as (
_ab_cdc_updated_at,
_ab_cdc_deleted_at,
_airbyte_emitted_at as _airbyte_start_at,
case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
anyOrNull(_airbyte_emitted_at) over (
partition by id
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc, _ab_cdc_updated_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_cdc_excluded_hashid
Expand All @@ -65,7 +66,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at, accurateCastOrNull(_ab_cdc_deleted_at, 'String'), accurateCastOrNull(_ab_cdc_updated_at, 'String')
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('dedup_exchange_rate_stg')
with

input_data as (
select *
from _airbyte_test_normalization.dedup_exchange_rate_ab3
from _airbyte_test_normalization.dedup_exchange_rate_stg
-- dedup_exchange_rate from test_normalization._airbyte_raw_dedup_exchange_rate
),

Expand Down Expand Up @@ -54,15 +55,15 @@ scd_data as (
NZD,
USD,
date as _airbyte_start_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
anyOrNull(date) over (
partition by id, currency, cast(NZD as String)
order by
date is null asc,
date desc,
_airbyte_emitted_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_exchange_rate_hashid
Expand All @@ -74,7 +75,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('renamed_dedup_cdc_excluded_stg')
with

input_data as (
select *
from _airbyte_test_normalization.renamed_dedup_cdc_excluded_ab3
from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg
-- renamed_dedup_cdc_excluded from test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
),

Expand All @@ -41,15 +42,15 @@ scd_data as (
))) as _airbyte_unique_key,
id,
_airbyte_emitted_at as _airbyte_start_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
anyOrNull(_airbyte_emitted_at) over (
partition by id
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_renamed_dedup_cdc_excluded_hashid
Expand All @@ -61,7 +62,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.dedup_cdc_excluded_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.dedup_exchange_rate_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.renamed_dedup_cdc_excluded_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
with __dbt__cte__exchange_rate_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization._airbyte_raw_exchange_rate
select
JSONExtractRaw(_airbyte_data, 'id') as id,
JSONExtractRaw(_airbyte_data, 'currency') as currency,
Expand All @@ -33,6 +34,7 @@ where 1 = 1
), __dbt__cte__exchange_rate_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__exchange_rate_ab1
select
accurateCastOrNull(id, '
BIGINT
Expand Down Expand Up @@ -60,6 +62,7 @@ where 1 = 1
), __dbt__cte__exchange_rate_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__exchange_rate_ab2
select
assumeNotNull(hex(MD5(

Expand Down Expand Up @@ -95,6 +98,7 @@ from __dbt__cte__exchange_rate_ab2 tmp
-- exchange_rate
where 1 = 1
)-- Final base SQL model
-- depends_on: __dbt__cte__exchange_rate_ab3
select
id,
currency,
Expand Down
Loading