Skip to content

Commit

Permalink
feat: support incremental model (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 13, 2024
1 parent 4b2f057 commit 2a14f6b
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 22 deletions.
13 changes: 0 additions & 13 deletions dbt/include/risingwave/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,6 @@
{% endmacro %}


-- temporary disable temp table for lacking support to rename table
{% macro risingwave__make_temp_relation(base_relation, suffix) %}
{%- set temp_identifier = base_relation.identifier -%}
{%- set temp_relation = base_relation.incorporate(
path={"identifier": temp_identifier}) -%}

{{ return(temp_relation) }}
{% endmacro %}

{% macro risingwave__make_intermediate_relation(base_relation, suffix) %}
{{ return(make_temp_relation(base_relation, suffix)) }}
{% endmacro %}

{% macro risingwave__get_create_index_sql(relation, index_dict) -%}
{%- set index_config = adapter.parse_index(index_dict) -%}
{%- set comma_separated_columns = ", ".join(index_config.columns) -%}
Expand Down
100 changes: 91 additions & 9 deletions dbt/include/risingwave/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,92 @@
{% materialization incremental, adapter='risingwave' %}
{{ exceptions.raise_compiler_error(
"""
dbt-risingwave does not support incremental models, but we provide a `materialized_view` model
which could keep your data up-to-date automatically and incrementally.
Use the `materialized_view` instead.
"""
)}}
{% endmaterialization %}

-- relations
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{%- set temp_relation = make_temp_relation(target_relation)-%}
{%- set intermediate_relation = make_intermediate_relation(target_relation)-%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

-- the temp_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation. This has to happen before
-- BEGIN, in a separate transaction
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{% if existing_relation is none %}
{% set build_sql = risingwave__create_table_as(target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = risingwave__create_table_as(intermediate_relation, sql) %}
{% set need_swap = true %}
{% else %}
{% do run_query(risingwave__create_table_as(temp_relation, sql)) %}
{% do to_drop.append(temp_relation) %}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}

{% endif %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{% if need_swap %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% do to_drop.append(backup_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{% do adapter.commit() %}

{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

0 comments on commit 2a14f6b

Please sign in to comment.