Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize email list id unnesting #28

Merged
merged 10 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .buildkite/scripts/run_models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ echo `pwd`
cd integration_tests
dbt deps
dbt seed --target "$db" --full-refresh
dbt compile --target "$db"
dbt run --target "$db" --full-refresh
dbt run --target "$db"
dbt test --target "$db"
dbt run --vars '{iterable__using_campaign_label_history: false, iterable__using_user_unsubscribed_message_type_history: false, iterable__using_campaign_suppression_list_history: false, iterable__using_user_device_history: true}' --target "$db" --full-refresh
dbt run --vars '{iterable__using_campaign_label_history: false, iterable__using_user_unsubscribed_message_type_history: false, iterable__using_campaign_suppression_list_history: false, iterable__using_user_device_history: true}' --target "$db"
dbt test --target "$db"
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# dbt_iterable v0.7.0
[PR #28](https://github.com/fivetran/dbt_iterable/pull/28) adds the following changes:

## 🚨 Breaking Changes 🚨
- Adjusts the default materialization of `int_iterable__list_user_history` from a view to a table. This was changed to optimize the runtime of the downstream `int_iterable__list_user_unnest` model.
- Updates `int_iterable__list_user_unnest` to be materialized as an incremental table. In order to add this logic, we also added a new `unique_key` field -- a surrogate key hashed on `email`, `list_id`, and `updated_at` -- and a `date_day` field to partition by on Bigquery + Databricks.
- **You will need to run a full refresh first to pick up the new columns**.

## Under the Hood
- Adds a `coalesce` to `previous_email_ids` in the `int_iterable__list_user_history` model, in case there are no previous email ids.

# dbt_iterable v0.6.0

## 🚨 Breaking Changes 🚨
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Include the following Iterable package version in your `packages.yml` file.
```yaml
packages:
- package: fivetran/iterable
version: [">=0.6.0", "<0.7.0"]
version: [">=0.7.0", "<0.8.0"]
```
## Step 3: Define database and schema variables
By default, this package runs using your destination and the `iterable` schema of your [target database](https://docs.getdbt.com/docs/running-a-dbt-project/using-the-command-line-interface/configure-your-profile). If this is not where your Iterable data is located (for example, if your Iterable schema is named `iterable_fivetran`), add the following configuration to your root `dbt_project.yml` file:
Expand Down
4 changes: 3 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'iterable'
version: '0.6.0'
version: '0.7.0'
config-version: 2
require-dbt-version: [">=1.3.0", "<2.0.0"]
models:
Expand All @@ -9,6 +9,8 @@ models:
intermediate:
+materialized: view
+schema: int_iterable
int_iterable__list_user_history:
+materialized: table
vars:
iterable:
campaign_history: "{{ ref('stg_iterable__campaign_history') }}"
Expand Down
7 changes: 5 additions & 2 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'iterable_integration_tests'
version: '0.6.0'
version: '0.7.0'
profile: 'integration_tests'
vars:
iterable_source:
Expand Down Expand Up @@ -43,4 +43,7 @@ seeds:
channel_id: "{%- if target.type == 'bigquery' -%} INT64 {%- else -%} bigint {%- endif -%}"
user_unsubscribed_message_type_history_data:
+column_types:
message_type_id: "{%- if target.type == 'bigquery' -%} INT64 {%- else -%} bigint {%- endif -%}"
message_type_id: "{%- if target.type == 'bigquery' -%} INT64 {%- else -%} bigint {%- endif -%}"
user_history_data:
+column_types:
updated_at: timestamp
2 changes: 1 addition & 1 deletion models/intermediate/int_iterable__list_user_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ with user_history as (
updated_at

from previous_email_list_ids
where email_list_ids != previous_ids -- list ids are always stored in their arrays in numerical order
where email_list_ids != coalesce(previous_ids, 'this is new') -- list ids are always stored in their arrays in numerical order

), most_recent_list_ids as (

Expand Down
119 changes: 85 additions & 34 deletions models/intermediate/int_iterable__list_user_unnest.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,64 @@
{{ config( materialized='table') }}
-- materializing as a table because the computations here are fairly complex
{{ config(
materialized='incremental',
unique_key='unique_key',
incremental_strategy='insert_overwrite' if target.type in ('bigquery', 'spark', 'databricks') else 'delete+insert',
partition_by={"field": "date_day", "data_type": "date"} if target.type not in ('spark','databricks') else ['date_day'],
file_format='delta',
on_schema_change='fail'
)
}}

with user_history as (

select *
from {{ ref('int_iterable__list_user_history') }}

/*
Unfortunately, `email_list_ids` are brought in as a JSON ARRAY, which different destinations handle completely differently.
The below code serves to extract and pivot list IDs into individual rows.
Records with an empty `email_list_ids` array will just have one row.
We are making the assumption that a user will not have more than 1000 lists. If that's wrong please open an issue!
https://github.com/fivetran/dbt_iterable/issues/new/choose
*/
from {{ ref('int_iterable__list_user_history') }} as user_history

{% if is_incremental() %}
{# the only rows we potentially want to overwrite are active ones #}
where user_history.updated_at >= coalesce((select min(updated_at) from {{ this }} where is_current), '2010-01-01')
{% endif %}

{% if target.type == 'redshift' %}
), numbers as (
select 0 as generated_number
union
select *
from (
{{ dbt_utils.generate_series(upper_bound=1000) }} )
{% endif %}
{# using PartiQL syntax to work with redshift's SUPER types #}
), redshift_parse_email_lists as (

select
email,
first_name,
last_name,
user_id,
signup_date,
signup_source,
phone_number,
updated_at,
is_current,
email_list_ids,
{# let's not remove empty array-rows #}
json_parse(case when email_list_ids = '[]' then '["is_null"]' else email_list_ids end) as super_email_list_ids

from user_history

), unnest_email_array as (

select
email,
first_name,
last_name,
user_id,
signup_date,
signup_source,
phone_number,
updated_at,
is_current,
{# go back to strings #}
cast(email_list_ids as {{ dbt.type_string() }}) as email_list_ids,
cast(email_list_id as {{ dbt.type_string() }}) as email_list_id

from redshift_parse_email_lists as emails, emails.super_email_list_ids as email_list_id

{% else %}
), unnest_email_array as (

select
email,
first_name,
Expand All @@ -36,33 +73,44 @@ https://github.com/fivetran/dbt_iterable/issues/new/choose
case when email_list_ids != '[]' then
{% if target.type == 'snowflake' %}
email_list_id.value
{% elif target.type == 'redshift' %}
json_extract_array_element_text(email_list_ids, cast(numbers.generated_number as {{ dbt.type_int() }}), true)
{% else %} email_list_id
{% endif %}
else null end
as
email_list_id
{% else %} email_list_id {% endif %} else null end as email_list_id

from user_history

cross join
{% if target.type == 'snowflake' %}
cross join
table(flatten(cast(email_list_ids as VARIANT))) as email_list_id
{% elif target.type == 'bigquery' %}
cross join
unnest(JSON_EXTRACT_STRING_ARRAY(email_list_ids)) as email_list_id
{% elif target.type == 'redshift' %}
numbers
where numbers.generated_number < json_array_length(email_list_ids, true)
or (numbers.generated_number + json_array_length(email_list_ids, true) = 0)
{% else %}
/* postgres */
{# postgres #}
cross join
json_array_elements_text(cast((
case when email_list_ids = '[]' then '["is_null"]' -- to not remove empty array-rows
case when email_list_ids = '[]' then '["is_null"]' {# to not remove empty array-rows #}
else email_list_ids end) as json)) as email_list_id
{%- endif %}

{%- endif -%}
), adjust_nulls as (

select
email,
first_name,
last_name,
user_id,
signup_date,
signup_source,
updated_at,
phone_number,
is_current,
case when email_list_ids = '["is_null"]' then '[]' else email_list_ids end as email_list_ids,
cast(NULLIF(email_list_id, 'is_null') as {{ dbt.type_int() }}) as list_id

from unnest_email_array

), final as (

select
email,
first_name,
Expand All @@ -74,8 +122,11 @@ https://github.com/fivetran/dbt_iterable/issues/new/choose
phone_number,
is_current,
email_list_ids,
cast(email_list_id as {{ dbt.type_int() }}) as list_id
from unnest_email_array
list_id,
{{ dbt_utils.generate_surrogate_key(["email", "list_id", "updated_at"]) }} as unique_key,
cast( {{ dbt.date_trunc('day', 'updated_at') }} as date) as date_day

from adjust_nulls
)

select *
Expand Down