Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake destination does not respect compound primary_key in MERGE statements (merge upsert strategy); also implements merge_key in delete-insert mode in an unexpected way #2320

Open
acaruso7 opened this issue Feb 17, 2025 · 3 comments
Labels
wontfix This will not be worked on

Comments

@acaruso7
Copy link

dlt version

1.5.0

Describe the problem

https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1739564309666949

Snowflake destination does not respect compound primary_key in MERGE statements generated by incremental merge upsert strategy. Instead, it just defaults to using only the primary key from sql_table() source in MERGE expression

Example:

MERGE INTO "PROD_SCHEMA"."TABLE" d USING "STAGING_SCHEMA"."TABLE" s ON d."ID" = s."ID" WHEN MATCHED THEN UPDATE SET
. . .

is generated in Snowflake destination when pipeline is configured as:

        sql_table(
            credentials=mysql_engine,
            table=source_table_name,
            backend="pyarrow",
            chunk_size=200000,
            incremental=(
                dlt.sources.incremental(
                   "updated_at",
                    initial_value=incremental_timestamp_start,
                    end_value=incremental_timestamp_end,
                    primary_key=("id", "created_at_year"),
                )
            ),
            write_disposition={"disposition": "merge", "strategy": "upsert"},
        )

Snowflake destination also produces unexpected behavior when using incremental merge delete-insert strategy with compound merge_key. DELETE statements in destination OR together the join condition for the primary_key (which, as mentioned above, it seems I'm not able to set), with an additional join condition produced by the compound merge_key config

Example:

DELETE FROM "PROD_SCHEMA"."TABLE" as d WHERE EXISTS (SELECT 1 FROM "STAGING_SCHEMA"."TABLE" as s WHERE d."ID" = s."ID" OR d."ID" = s."ID" AND d."CREATED_AT_YEAR" = s."CREATED_AT_YEAR");
. . .

is generated in Snowflake destination when pipeline is configured as:

        sql_table(
            credentials=mysql_engine,
            table=source_table_name,
            backend="pyarrow",
            chunk_size=200000,
            incremental=(
                dlt.sources.incremental(
                   "updated_at",
                    initial_value=incremental_timestamp_start,
                    end_value=incremental_timestamp_end,
                    primary_key=("id"),
                )
            ),
            write_disposition={"disposition": "merge", "strategy": "delete-insert"},
        ).apply_hints(merge_key=("id", "created_at_year"))

Expected behavior

sql_table() source with Snowflake destination configured as below:

        sql_table(
            credentials=mysql_engine,
            table=source_table_name,
            backend="pyarrow",
            chunk_size=200000,
            incremental=(
                dlt.sources.incremental(
                   "updated_at",
                    initial_value=incremental_timestamp_start,
                    end_value=incremental_timestamp_end,
                    primary_key=("id", "created_at_year"),
                )
            ),
            write_disposition={"disposition": "merge", "strategy": "upsert"},
        )

should produce the following join condition in generated Snowflake MERGE statement:

MERGE INTO . . . ON d."ID" = s."ID" AND d."CREATED_AT_YEAR" = s."CREATED_AT_YEAR"

sql_table() source with Snowflake destination configured as below:

        sql_table(
            credentials=mysql_engine,
            table=source_table_name,
            backend="pyarrow",
            chunk_size=200000,
            incremental=(
                dlt.sources.incremental(
                   "updated_at",
                    initial_value=incremental_timestamp_start,
                    end_value=incremental_timestamp_end,
                    primary_key=("id"),
                )
            ),
            write_disposition={"disposition": "merge", "strategy": "delete-insert"},
        ).apply_hints(merge_key=("id", "created_at_year"))

should produce the following join condition in generated Snowflake DELETE statement

DELETE FROM . . . WHERE EXISTS(. . . d."ID" = s."ID" AND d."CREATED_AT_YEAR" = s."CREATED_AT_YEAR");
. . .

Columns which are part of both the primary_key and merge_key should only be considered once in join condition, and never OR'd together

Steps to reproduce

Unsure how to reproduce as I do not have a public Snowflake instance where I can demonstrate the behavior, but the snippets above should get most of the way there

Operating system

Linux

Runtime environment

Docker, Docker Compose

Python version

3.10

dlt data source

sql_table()

dlt destination

Snowflake

Other deployment details

No response

Additional information

No response

@sh-rp
Copy link
Collaborator

sh-rp commented Feb 18, 2025

@acaruso7: the primary key defined in the incremental is not used to control the materialization in the destination, but to deduplicate incoming data in a load. If you need a compound primary key for your sql_table, you can define one with sql_table_instance.apply_hints(primary_key=("blah", "bluh")). The reason you are seing the ID being used for the merge is, because as it seems you have a primary_key of "id" defined in the sql schema of the source database.

@sh-rp sh-rp added the wontfix This will not be worked on label Feb 18, 2025
@acaruso7
Copy link
Author

@sh-rp thanks for the response

I see now, the incremental primary_key attribute doesn't do the same thing as the primary_key on the sql_table object instance

This is a little confusing naming convention wise; personally I think it would be more clear to use something like dedup_key for the incremental argument, and merge_key for the sql_table hint (which already exists as a possible hint on this resource, see below)

If I can control the join condition for SQL MERGE expressions in destination using sql_table_instance.apply_hints(primary_key=("my", "key")), then I'm not sure what the purpose of the separate merge_key argument is; the merge_key columns just get OR'd to the join expression generated by the primary_key columns. If you need additional join clauses for your merge, why not just include them in the primary_key hint? The OR is typically going to be bad for performance in a lot of database destinations

return sa.or_(*clauses) # type: ignore[no-any-return]

One other followup question for you: is there a way I can easily log/print out the raw merge expression SQL generated by dlt and submitted to destination? That would make configuring things a lot more straightforward and easier for me to understand the behavior of various config changes as it appears in my destination

@acaruso7
Copy link
Author

Note for future readers . . . I think the reason I was encountering this

Snowflake destination does not respect compound primary_key in MERGE statements

is a combination of the following:

  • I was using the primary_key incremental arg (which is meant for deduping inbound data, not merge joins), instead of sql_table.apply_hints(primary_key=("my", "key"))
  • I was not resetting my dlt pipeline state in between runs after modifying my hints . . . I needed to delete both my local and remote pipeline state before re-running, to get the changes to take effect
rm -r /var/dlt/pipelines/mypipeline
delete from db.schema._dlt_pipeline_state
where pipeline_name = 'mypipeline';

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
Status: Todo
Development

No branches or pull requests

2 participants