Skip to content

Commit

Permalink
get archive blocks working. Using temp tables is extremely slow.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Mar 25, 2019
1 parent 1b39793 commit 596b0c7
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@

{% macro default__archive_update(target_relation, tmp_relation) %}
update {{ target_relation }}
set {{ adapter.quote('valid_to') }} = tmp.{{ adapter.quote('valid_to') }}
set {{ adapter.quote('dbt_valid_to') }} = tmp.{{ adapter.quote('dbt_valid_to') }}
from {{ tmp_relation }} as tmp
where tmp.{{ adapter.quote('scd_id') }} = {{ target_relation }}.{{ adapter.quote('scd_id') }}
where tmp.{{ adapter.quote('dbt_scd_id') }} = {{ target_relation }}.{{ adapter.quote('dbt_scd_id') }}
and {{ adapter.quote('change_type') }} = 'update';
{% endmacro %}

Expand All @@ -73,7 +73,7 @@
{% endfor %},
{{ updated_at }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ updated_at }} as {{ adapter.quote('valid_from') }},
{{ updated_at }} as {{ adapter.quote('dbt_valid_from') }},
{{ timestamp_column.literal('null') }} as {{ adapter.quote('tmp_valid_to') }}
from source
),
Expand All @@ -86,8 +86,8 @@
{% endfor %}
{{ updated_at }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ adapter.quote('valid_from') }},
{{ adapter.quote('valid_to') }} as {{ adapter.quote('tmp_valid_to') }}
{{ adapter.quote('dbt_valid_from') }},
{{ adapter.quote('dbt_valid_to') }} as {{ adapter.quote('tmp_valid_to') }}
from {{ target_relation }}

),
Expand All @@ -96,7 +96,7 @@

select
current_data.*,
{{ timestamp_column.literal('null') }} as {{ adapter.quote('valid_to') }}
{{ timestamp_column.literal('null') }} as {{ adapter.quote('dbt_valid_to') }}
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
Expand All @@ -111,7 +111,7 @@

select
archived_data.*,
current_data.{{ adapter.quote('dbt_updated_at') }} as {{ adapter.quote('valid_to') }}
current_data.{{ adapter.quote('dbt_updated_at') }} as {{ adapter.quote('dbt_valid_to') }}
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
Expand All @@ -129,27 +129,22 @@
)

select *,
{{ archive_scd_hash() }} as {{ adapter.quote('scd_id') }}
{{ archive_scd_hash() }} as {{ adapter.quote('dbt_scd_id') }}
from merged

{% endmacro %}


{# this is gross #}
{% macro create_empty_view_as(sql) %}
{% macro create_empty_table_as(sql) %}
{% set tmp_relation = api.Relation.create(identifier=model['name']+'_dbt_archival_view_tmp', type='view') %}
{% call statement('_') %}
{{ drop_relation(tmp_relation) }}
{% endcall %}
{% call statement('_') %}
{% set limited_sql %}
with cte as (
{{ sql }}
)
select * from cte limit 0
{% endset %}
{{ create_view_as(tmp_relation, limited_sql) }}
{% endcall %}
{% set limited_sql -%}
with cte as (
{{ sql }}
)
select * from cte limit 0
{%- endset %}
{%- set tmp_relation = create_temporary_table(limited_sql, tmp_relation) -%}

{{ return(tmp_relation) }}

Expand Down Expand Up @@ -196,7 +191,7 @@
{{ exceptions.relation_wrong_type(target_relation, 'table') }}
{%- endif -%}

{% set source_info_model = create_empty_view_as(model['raw_sql']) %}
{% set source_info_model = create_empty_table_as(model['injected_sql']) %}

{%- set source_columns = adapter.get_columns_in_relation(source_info_model) -%}

Expand Down Expand Up @@ -225,7 +220,7 @@
{% set tmp_table_sql -%}

with dbt_archive_sbq as (
{{ archive_select(model['raw_sql'], target_relation, source_columns, unique_key, updated_at) }}
{{ archive_select(model['injected_sql'], target_relation, source_columns, unique_key, updated_at) }}
)
select * from dbt_archive_sbq

Expand Down
19 changes: 15 additions & 4 deletions core/dbt/parser/archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dbt.node_types import NodeType
from dbt.parser.base import MacrosKnownParser
from dbt.parser.base_sql import BaseSqlParser, SQLParseResult

from dbt.adapters.factory import get_adapter
import dbt.clients.jinja
import dbt.exceptions
import dbt.utils
Expand All @@ -24,7 +24,7 @@ def parse_archives_from_project(cls, config):

for table in tables:
cfg = table.copy()
cfg['source_database'] = archive_config.get(
source_database = archive_config.get(
'source_database',
config.credentials.database
)
Expand All @@ -33,11 +33,22 @@ def parse_archives_from_project(cls, config):
config.credentials.database
)

cfg['source_schema'] = archive_config.get('source_schema')
source_schema = archive_config['source_schema']
cfg['target_schema'] = archive_config.get('target_schema')

fake_path = [cfg['target_database'], cfg['target_schema'],
cfg['target_table']]

relation = get_adapter(config).Relation.create(
database=source_database,
schema=source_schema,
identifier=table['source_table'],
type='table'
)

raw_sql = '{{ config(materialized="archive") }}' + \
'select * from {!s}'.format(relation)

archives.append({
'name': table.get('target_table'),
'root_path': config.project_root,
Expand All @@ -46,7 +57,7 @@ def parse_archives_from_project(cls, config):
'original_file_path': 'dbt_project.yml',
'package_name': config.project_name,
'config': cfg,
'raw_sql': '{{config(materialized="archive")}} -- noop'
'raw_sql': raw_sql
})

return archives
Expand Down
3 changes: 3 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def is_cancelable(cls):
return False

def drop_relation(self, relation):
### TEMP HACK
if relation.database is None and relation.schema is None:
return
is_cached = self._schema_is_cached(relation.database, relation.schema)
if is_cached:
self.cache.drop(relation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

{% macro bigquery__archive_update(target_relation, tmp_relation) %}
update {{ target_relation }} as dest
set dest.{{ adapter.quote('valid_to') }} = tmp.{{ adapter.quote('valid_to') }}
set dest.{{ adapter.quote('dbt_valid_to') }} = tmp.{{ adapter.quote('dbt_valid_to') }}
from {{ tmp_relation }} as tmp
where tmp.{{ adapter.quote('scd_id') }} = dest.{{ adapter.quote('scd_id') }}
where tmp.{{ adapter.quote('dbt_scd_id') }} = dest.{{ adapter.quote('dbt_scd_id') }}
and {{ adapter.quote('change_type') }} = 'update';
{% endmacro %}
2 changes: 1 addition & 1 deletion plugins/redshift/dbt/include/redshift/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
{{ column_list_for_create_table(columns) }}
)
{{ dist('dbt_updated_at') }}
{{ sort('compound', ['scd_id']) }};
{{ sort('compound', ['dbt_scd_id']) }};
{%- endmacro %}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ where id >= 10 and id <= 20;

-- invalidate records 11 - 21
update {database}.{schema}.archive_expected set
valid_to = timestamp_add(updated_at, interval 1 hour)
dbt_valid_to = timestamp_add(updated_at, interval 1 hour)
where id >= 10 and id <= 20;
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ where "id" >= 10 and "id" <= 20;

-- invalidate records 11 - 21
update {schema}.archive_expected set
"valid_to" = "updated_at" + interval '1 hour'
"dbt_valid_to" = "updated_at" + interval '1 hour'
where "id" >= 10 and "id" <= 20;
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ where "id" >= 10 and "id" <= 20;

-- invalidate records 11 - 21
update {database}.{schema}.archive_expected set
"valid_to" = DATEADD(hour, 1, "updated_at")
"dbt_valid_to" = DATEADD(hour, 1, "updated_at")
where "id" >= 10 and "id" <= 20;
18 changes: 9 additions & 9 deletions test/integration/004_simple_archive_test/seed.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ create table {database}.{schema}.archive_expected (

-- archival fields
"updated_at" TIMESTAMP WITHOUT TIME ZONE,
"valid_from" TIMESTAMP WITHOUT TIME ZONE,
"valid_to" TIMESTAMP WITHOUT TIME ZONE,
"scd_id" VARCHAR(255),
"dbt_valid_from" TIMESTAMP WITHOUT TIME ZONE,
"dbt_valid_to" TIMESTAMP WITHOUT TIME ZONE,
"dbt_scd_id" VARCHAR(255),
"dbt_updated_at" TIMESTAMP WITHOUT TIME ZONE
);

Expand Down Expand Up @@ -58,10 +58,10 @@ insert into {database}.{schema}.archive_expected (
"gender",
"ip_address",
"updated_at",
"valid_from",
"valid_to",
"dbt_valid_from",
"dbt_valid_to",
"dbt_updated_at",
"scd_id"
"dbt_scd_id"
)

select
Expand All @@ -73,8 +73,8 @@ select
"ip_address",
"updated_at",
-- fields added by archival
"updated_at" as valid_from,
null::timestamp as valid_to,
"updated_at" as dbt_valid_from,
null::timestamp as dbt_valid_to,
"updated_at" as dbt_updated_at,
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as scd_id
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as dbt_scd_id
from {database}.{schema}.seed;
18 changes: 9 additions & 9 deletions test/integration/004_simple_archive_test/seed_bq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ create table {database}.{schema}.archive_expected (

-- archival fields
`updated_at` TIMESTAMP,
`valid_from` TIMESTAMP,
`valid_to` TIMESTAMP,
`scd_id` STRING,
`dbt_valid_from` TIMESTAMP,
`dbt_valid_to` TIMESTAMP,
`dbt_scd_id` STRING,
`dbt_updated_at` TIMESTAMP
);

Expand Down Expand Up @@ -58,10 +58,10 @@ insert {database}.{schema}.archive_expected (
`gender`,
`ip_address`,
`updated_at`,
`valid_from`,
`valid_to`,
`dbt_valid_from`,
`dbt_valid_to`,
`dbt_updated_at`,
`scd_id`
`dbt_scd_id`
)

select
Expand All @@ -73,9 +73,9 @@ select
`ip_address`,
`updated_at`,
-- fields added by archival
`updated_at` as valid_from,
cast(null as timestamp) as valid_to,
`updated_at` as dbt_valid_from,
cast(null as timestamp) as dbt_valid_to,
`updated_at` as dbt_updated_at,
to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as scd_id
to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as dbt_scd_id
from {database}.{schema}.seed;

24 changes: 12 additions & 12 deletions test/integration/004_simple_archive_test/update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ insert into {database}.{schema}.archive_expected (
"gender",
"ip_address",
"updated_at",
"valid_from",
"valid_to",
"dbt_valid_from",
"dbt_valid_to",
"dbt_updated_at",
"scd_id"
"dbt_scd_id"
)

select
Expand All @@ -23,10 +23,10 @@ select
"ip_address",
"updated_at",
-- fields added by archival
"updated_at" as "valid_from",
null::timestamp as "valid_to",
"updated_at" as "dbt_valid_from",
null::timestamp as "dbt_valid_to",
"updated_at" as "dbt_updated_at",
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "scd_id"
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id"
from {database}.{schema}.seed
where "id" >= 10 and "id" <= 20;

Expand Down Expand Up @@ -54,10 +54,10 @@ insert into {database}.{schema}.archive_expected (
"gender",
"ip_address",
"updated_at",
"valid_from",
"valid_to",
"dbt_valid_from",
"dbt_valid_to",
"dbt_updated_at",
"scd_id"
"dbt_scd_id"
)

select
Expand All @@ -69,9 +69,9 @@ select
"ip_address",
"updated_at",
-- fields added by archival
"updated_at" as "valid_from",
null::timestamp as "valid_to",
"updated_at" as "dbt_valid_from",
null::timestamp as "dbt_valid_to",
"updated_at" as "dbt_updated_at",
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "scd_id"
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id"
from {database}.{schema}.seed
where "id" > 20;
24 changes: 12 additions & 12 deletions test/integration/004_simple_archive_test/update_bq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ insert {database}.{schema}.archive_expected (
`gender`,
`ip_address`,
`updated_at`,
`valid_from`,
`valid_to`,
`dbt_valid_from`,
`dbt_valid_to`,
`dbt_updated_at`,
`scd_id`
`dbt_scd_id`
)

select
Expand All @@ -23,10 +23,10 @@ select
`ip_address`,
`updated_at`,
-- fields added by archival
`updated_at` as `valid_from`,
cast(null as timestamp) as `valid_to`,
`updated_at` as `dbt_valid_from`,
cast(null as timestamp) as `dbt_valid_to`,
`updated_at` as `dbt_updated_at`,
to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as `scd_id`
to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as `dbt_scd_id`
from {database}.{schema}.seed
where `id` >= 10 and `id` <= 20;

Expand Down Expand Up @@ -54,10 +54,10 @@ insert {database}.{schema}.archive_expected (
`gender`,
`ip_address`,
`updated_at`,
`valid_from`,
`valid_to`,
`dbt_valid_from`,
`dbt_valid_to`,
`dbt_updated_at`,
`scd_id`
`dbt_scd_id`
)

select
Expand All @@ -69,10 +69,10 @@ select
`ip_address`,
`updated_at`,
-- fields added by archival
`updated_at` as `valid_from`,
cast(null as timestamp) as `valid_to`,
`updated_at` as `dbt_valid_from`,
cast(null as timestamp) as `dbt_valid_to`,
`updated_at` as `dbt_updated_at`,
to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as `scd_id`
to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as `dbt_scd_id`
from {database}.{schema}.seed
where `id` > 20;

0 comments on commit 596b0c7

Please sign in to comment.