Skip to content

Commit

Permalink
Merge pull request #28 from fivetran/feature/optimize-list-unnesting
Browse files Browse the repository at this point in the history
optimize email list id unnesting
  • Loading branch information
fivetran-jamie authored May 3, 2023
2 parents 2fc2e75 + e2219ca commit 236e94e
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 43 deletions.
3 changes: 3 additions & 0 deletions .buildkite/scripts/run_models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ echo `pwd`
cd integration_tests
dbt deps
dbt seed --target "$db" --full-refresh
dbt compile --target "$db"
dbt run --target "$db" --full-refresh
dbt run --target "$db"
dbt test --target "$db"
dbt run --vars '{iterable__using_campaign_label_history: false, iterable__using_user_unsubscribed_message_type_history: false, iterable__using_campaign_suppression_list_history: false, iterable__using_user_device_history: true}' --target "$db" --full-refresh
dbt run --vars '{iterable__using_campaign_label_history: false, iterable__using_user_unsubscribed_message_type_history: false, iterable__using_campaign_suppression_list_history: false, iterable__using_user_device_history: true}' --target "$db"
dbt test --target "$db"
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# dbt_iterable v0.7.0
[PR #28](https://github.com/fivetran/dbt_iterable/pull/28) adds the following changes:

## 🚨 Breaking Changes 🚨
- Adjusts the default materialization of `int_iterable__list_user_history` from a view to a table. This was changed to optimize the runtime of the downstream `int_iterable__list_user_unnest` model.
- Updates `int_iterable__list_user_unnest` to be materialized as an incremental table. In order to add this logic, we also added a new `unique_key` field -- a surrogate key hashed on `email`, `list_id`, and `updated_at` -- and a `date_day` field to partition by on Bigquery + Databricks.
- **You will need to run a full refresh first to pick up the new columns**.

## Under the Hood
- Adds a `coalesce` to `previous_email_ids` in the `int_iterable__list_user_history` model, in case there are no previous email ids.
- Adjusts the `flatten` logic in `int_iterable__list_user_unnest` for Snowflake users.

# dbt_iterable v0.6.0

## 🚨 Breaking Changes 🚨
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Include the following Iterable package version in your `packages.yml` file.
```yaml
packages:
- package: fivetran/iterable
version: [">=0.6.0", "<0.7.0"]
version: [">=0.7.0", "<0.8.0"]
```
## Step 3: Define database and schema variables
By default, this package runs using your destination and the `iterable` schema of your [target database](https://docs.getdbt.com/docs/running-a-dbt-project/using-the-command-line-interface/configure-your-profile). If this is not where your Iterable data is located (for example, if your Iterable schema is named `iterable_fivetran`), add the following configuration to your root `dbt_project.yml` file:
Expand Down
4 changes: 3 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'iterable'
version: '0.6.0'
version: '0.7.0'
config-version: 2
require-dbt-version: [">=1.3.0", "<2.0.0"]
models:
Expand All @@ -9,6 +9,8 @@ models:
intermediate:
+materialized: view
+schema: int_iterable
int_iterable__list_user_history:
+materialized: table
vars:
iterable:
campaign_history: "{{ ref('stg_iterable__campaign_history') }}"
Expand Down
7 changes: 5 additions & 2 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'iterable_integration_tests'
version: '0.6.0'
version: '0.7.0'
profile: 'integration_tests'
vars:
iterable_source:
Expand Down Expand Up @@ -43,4 +43,7 @@ seeds:
channel_id: "{%- if target.type == 'bigquery' -%} INT64 {%- else -%} bigint {%- endif -%}"
user_unsubscribed_message_type_history_data:
+column_types:
message_type_id: "{%- if target.type == 'bigquery' -%} INT64 {%- else -%} bigint {%- endif -%}"
message_type_id: "{%- if target.type == 'bigquery' -%} INT64 {%- else -%} bigint {%- endif -%}"
user_history_data:
+column_types:
updated_at: timestamp
14 changes: 11 additions & 3 deletions integration_tests/seeds/user_history_data.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
email,updated_at,first_name,last_name,phone_number,user_id,signup_date,signup_source,phone_number_carrier,phone_number_country_code_iso,phone_number_line_type,phone_number_updated_at,_fivetran_synced,email_list_ids
[email protected],2021-06-03 08:18:30.000,,,,,2021-06-03 08:14:55.000,Import,,,,,2021-06-03 09:18:13.877,"[826724,884398]"
[email protected],2021-06-03 08:32:01.000,,,,,2021-06-03 08:32:01.000,API,,,,,2021-06-03 09:18:13.708,"[]"
email,updated_at,_fivetran_synced,email_list_ids,first_name,last_name,phone_number,phone_number_carrier,phone_number_country_code_iso,phone_number_details,phone_number_line_type,phone_number_updated_at,signup_date,signup_source,user_id
[email protected],2018-09-14 08:29:13,2021-03-29 17:14:01,[162418],,,,,,,,,2018-09-14 08:29:13,Import,
[email protected],2018-09-18 17:24:11,2021-03-29 17:14:01,[163833],,,,,,,,,2018-09-18 17:22:15,API,
[email protected],2018-09-19 10:06:24,2021-03-29 17:14:01,[164228],,,,,,,,,2018-09-19 10:06:24,Import,22222
[email protected],2018-10-05 05:37:24,2021-03-29 17:14:01,"[159258,159261,162418]",person4,,,,,,,,2018-09-06 10:18:17,Import,string
[email protected],2018-10-05 05:46:47,2021-03-29 17:14:01,"[159246,159258,159261,162418,163833,173413]",person5,five,,ZZZ,0,,Phone,,2018-09-06 9:11:43,Import,1111
[email protected],2018-10-06 11:36:18,2021-03-29 17:14:01,[173950],,,,,,,,,2018-10-06 11:36:18,Import,
[email protected],2021-04-20 20:31:29,2021-04-20 20:56:40,[953900],,,,,,,,,2021-04-20 20:31:30,Import,
[email protected],2021-04-20 20:43:37,2021-04-20 20:56:40,[953900],,,,,,,,,2021-04-20 20:30:16,Import,
[email protected],2021-04-21 20:21:29,2021-04-21 20:28:04,[],,,,,,,,,2021-04-21 20:21:29,API,person_test
[email protected],2021-04-21 20:59:40,2021-04-21 21:01:52,[953900],,,,,,,,,2021-04-20 20:30:16,Import,
2 changes: 1 addition & 1 deletion models/intermediate/int_iterable__list_user_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ with user_history as (
updated_at

from previous_email_list_ids
where email_list_ids != previous_ids -- list ids are always stored in their arrays in numerical order
where email_list_ids != coalesce(previous_ids, 'this is new') -- list ids are always stored in their arrays in numerical order

), most_recent_list_ids as (

Expand Down
121 changes: 86 additions & 35 deletions models/intermediate/int_iterable__list_user_unnest.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,64 @@
{{ config( materialized='table') }}
-- materializing as a table because the computations here are fairly complex
{{ config(
materialized='incremental',
unique_key='unique_key',
incremental_strategy='insert_overwrite' if target.type in ('bigquery', 'spark', 'databricks') else 'delete+insert',
partition_by={"field": "date_day", "data_type": "date"} if target.type not in ('spark','databricks') else ['date_day'],
file_format='delta',
on_schema_change='fail'
)
}}

with user_history as (

select *
from {{ ref('int_iterable__list_user_history') }}

/*
Unfortunately, `email_list_ids` are brought in as a JSON ARRAY, which different destinations handle completely differently.
The below code serves to extract and pivot list IDs into individual rows.
Records with an empty `email_list_ids` array will just have one row.
We are making the assumption that a user will not have more than 1000 lists. If that's wrong please open an issue!
https://github.com/fivetran/dbt_iterable/issues/new/choose
*/
from {{ ref('int_iterable__list_user_history') }} as user_history

{% if is_incremental() %}
{# the only rows we potentially want to overwrite are active ones #}
where user_history.updated_at >= coalesce((select min(updated_at) from {{ this }} where is_current), '2010-01-01')
{% endif %}

{% if target.type == 'redshift' %}
), numbers as (
select 0 as generated_number
union
select *
from (
{{ dbt_utils.generate_series(upper_bound=1000) }} )
{% endif %}
{# using PartiQL syntax to work with redshift's SUPER types #}
), redshift_parse_email_lists as (

select
email,
first_name,
last_name,
user_id,
signup_date,
signup_source,
phone_number,
updated_at,
is_current,
email_list_ids,
{# let's not remove empty array-rows #}
json_parse(case when email_list_ids = '[]' then '["is_null"]' else email_list_ids end) as super_email_list_ids

from user_history

), unnest_email_array as (

select
email,
first_name,
last_name,
user_id,
signup_date,
signup_source,
phone_number,
updated_at,
is_current,
{# go back to strings #}
cast(email_list_ids as {{ dbt.type_string() }}) as email_list_ids,
cast(email_list_id as {{ dbt.type_string() }}) as email_list_id

from redshift_parse_email_lists as emails, emails.super_email_list_ids as email_list_id

{% else %}
), unnest_email_array as (

select
email,
first_name,
Expand All @@ -36,33 +73,44 @@ https://github.com/fivetran/dbt_iterable/issues/new/choose
case when email_list_ids != '[]' then
{% if target.type == 'snowflake' %}
email_list_id.value
{% elif target.type == 'redshift' %}
json_extract_array_element_text(email_list_ids, cast(numbers.generated_number as {{ dbt.type_int() }}), true)
{% else %} email_list_id
{% endif %}
else null end
as
email_list_id
{% else %} email_list_id {% endif %} else null end as email_list_id

from user_history

cross join
{% if target.type == 'snowflake' %}
table(flatten(cast(email_list_ids as VARIANT))) as email_list_id
cross join
table(flatten(input => parse_json(email_list_ids))) as email_list_id
{% elif target.type == 'bigquery' %}
cross join
unnest(JSON_EXTRACT_STRING_ARRAY(email_list_ids)) as email_list_id
{% elif target.type == 'redshift' %}
numbers
where numbers.generated_number < json_array_length(email_list_ids, true)
or (numbers.generated_number + json_array_length(email_list_ids, true) = 0)
{% else %}
/* postgres */
{# postgres #}
cross join
json_array_elements_text(cast((
case when email_list_ids = '[]' then '["is_null"]' -- to not remove empty array-rows
case when email_list_ids = '[]' then '["is_null"]' {# to not remove empty array-rows #}
else email_list_ids end) as json)) as email_list_id
{%- endif %}

{%- endif -%}
), adjust_nulls as (

select
email,
first_name,
last_name,
user_id,
signup_date,
signup_source,
updated_at,
phone_number,
is_current,
case when email_list_ids = '["is_null"]' then '[]' else email_list_ids end as email_list_ids,
cast(NULLIF(email_list_id, 'is_null') as {{ dbt.type_int() }}) as list_id

from unnest_email_array

), final as (

select
email,
first_name,
Expand All @@ -74,8 +122,11 @@ https://github.com/fivetran/dbt_iterable/issues/new/choose
phone_number,
is_current,
email_list_ids,
cast(email_list_id as {{ dbt.type_int() }}) as list_id
from unnest_email_array
list_id,
{{ dbt_utils.generate_surrogate_key(["email", "list_id", "updated_at"]) }} as unique_key,
cast( {{ dbt.date_trunc('day', 'updated_at') }} as date) as date_day

from adjust_nulls
)

select *
Expand Down

0 comments on commit 236e94e

Please sign in to comment.