Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adbc_ingest() is dropping rows in Snowflake #1847

Closed
davlee1972 opened this issue May 9, 2024 · 22 comments · Fixed by #1866
Closed

adbc_ingest() is dropping rows in Snowflake #1847

davlee1972 opened this issue May 9, 2024 · 22 comments · Fixed by #1866
Labels
Type: bug Something isn't working

Comments

@davlee1972
Copy link

davlee1972 commented May 9, 2024

What happened?

I'm trying to load 98 million rows from a set of CSV files (5 year period), but only 95 to 96 million rows are getting inserted into Snowflake uisng adbc_ingest.. The distribution of missing data is pretty random and is around ~16k records per day.

I tried passing to adbc_ingest(), a pyarrow table and record batches.. In both cases rows are being dropped..

Here's a screenshot of my notebook code..
image

The odd thing is that sometimes it inserts 95 million rows and other times it inserts 96 million rows.. The total sum of inserted rows matches what I'm seeing in Snowflake logs if I add up all the rows created using COPY INTO sql commands..

It looks like we're not sending all the batches across the wire..

How can we reproduce the bug?

No response

Environment/Setup

Python 3.9.10 on RedHat 8 linux with ADBC drivers 0.10.0..

@davlee1972 davlee1972 added the Type: bug Something isn't working label May 9, 2024
@lidavidm lidavidm added this to the ADBC Libraries 13 milestone May 10, 2024
@zeroshade
Copy link
Member

That's really weird. Can you confirm the issue still exists in the newer ADBC driver versions while I take a look and see if anything stands out to me that could be causing the issue. I'm guessing there's a race condition somewhere in there

@davlee1972
Copy link
Author

I just upgraded to 0.11.0 and changed my table name so it can use a quoted identifier..

Same result..

image

image

@davlee1972
Copy link
Author

davlee1972 commented May 13, 2024

Here's the table create statement which adbc_ingest() generated with mode="create_append".. I'm going to do some double checking later today or tomorrow to see if there are any issues with using "double" types, but there shouldn't be with Arrow Float 32s which are single precision floats..

image

@davlee1972
Copy link
Author

davlee1972 commented May 13, 2024

I reviewed the data differences between the source and snowflake table and I don't see any patterns.. Large numbers, small numbers, positive/negative/0.0 records are randomly missing.. With 95,707,710 out of 98 million rows inserted I see missing data for Jan 18th, 2018 which is one of the first occurrences..

image

Running this a 2nd time, I get 96,017,343 out of 98 million rows inserted and now Jan 18th, 2018 isn't missing any data..

image

@davlee1972
Copy link
Author

I tried adjusting the settings below and it didn't help.. There is most likely a bug with these parameters.

If I pass them in as Integers I get an error: ValueError: value must be str or bytes
adbc.snowflake.rpc.ingest_update_concurrency: 1

If I pass them in as a String it doesn't appear to do anything. The activity logs don't reflect the expected behavior....
adbc.snowflake.rpc.ingest_update_concurrency: "1"

image

@zeroshade
Copy link
Member

I'm gonna try to find some time later this week to look at this. In the meantime, @joellubi would you happen to have some time to dig into this?

@joellubi
Copy link
Member

Sure, taking a look.

@davlee1972 I did notice a typo in the option name: I think that by adbc.snowflake.rpc.ingest_update_concurrency you mean to actually specify adbc.snowflake.rpc.ingest_upload_concurrency.

@joellubi
Copy link
Member

joellubi commented May 15, 2024

I haven't been able to reproduce this so far even with an ingest of this volume (running 0.10.0). I haven't tested extreme values for any of the fields yet but it's not yet clear that the issue is related to that.

image

Are there any COPY errors found by running the following query? You may need to add more filters to isolate a particular run.

select * from snowflake.account_usage.copy_history where table_name = '<TABLE_NAME>' AND error_count > 0;

@davlee1972
Copy link
Author

davlee1972 commented May 15, 2024

Here's the copy history.. I don't see any errors, but there are a ton of 256k empty parquet files being sent..
The total sum of rows parsed adds up to 95 million which is 3 million short..

I think I made a typo in the comments above for RPC parameters, but I'll retest all four RPC parameters again. Trying to eliminate any type of concurrency to debug this.. I'm using a X-SMALL warehouse so I'm wondering if there might be issues with concurrency settings if the number of warehouse cores is lower than settings..

image

@davlee1972
Copy link
Author

davlee1972 commented May 15, 2024

Ok I figured it out.. There is a bug with how adbc_ingest is handling batches..

I reduced the number of records to 1.1 million rows and the bug still happens..

If I read data from a pyarrow dataset of parquet files and try to write it to Snowflake I get ZERO rows inserted..

If I write my data to a single parquet file, reread it and then try to write it to Snowflake I get all my rows inserted..

image

On a side note I'm not sure why these params are STRINGs and not INTs..

image

@zeroshade
Copy link
Member

@davlee1972 They are strings and not ints, primarily because the corresponding functions in the snowflake driver haven't been implemented. Only the default SetOption that takes a string. SetOptionInt and SetOptionDouble aren't implemented for the snowflake driver currently, so that's why it requires the option values to be strings.

If I read data from a pyarrow dataset of parquet files and try to write it to Snowflake I get ZERO rows inserted..

Interesting, that's extremely odd. Looking at the code you provided screenshots for the scan_data for the dataset scenario is still already a pyarrow.Table, it just appears to have a few zero length chunks at the start? Maybe something is screwy dealing with the zero-length chunks?

@davlee1972
Copy link
Author

They are both pyarrow.Tables, but the first one is the result of pyarrow.dataset.to_table() with filtering..

First one has 1095 chunks which matches the 1095 parquet files over a 4 year period even though I'm only looking at a month of data. This is the result of parallelized reads used in pyarrow.dataset..
image

The second one only has 9 chunks for a single file..
image

@joellubi
Copy link
Member

Thanks @davlee1972 that's a great insight. By setting several of the table's chunks to be empty, I can now reproduce the issue getting a nondeterministic number of rows copied in each run.

@zeroshade
Copy link
Member

@joellubi can you replicate it with pure go? Or only through pyarrow with some table chunks set to be empty? Just trying to narrow down where the issue might be

@joellubi
Copy link
Member

@zeroshade Yes just got a pure go reproduction

With a record reader that produces 1 empty batch and then 10 batches of 100 (i.e. expecting 1000 rows):

joel@Joels-MacBook-Pro-2 20240514-sf-ingest-dropped-rows % go run main.go
2024/05/15 17:12:24 retained
2024/05/15 17:12:26 released
2024/05/15 17:12:26 1000 rows ingested
joel@Joels-MacBook-Pro-2 20240514-sf-ingest-dropped-rows % go run main.go
2024/05/15 17:12:33 retained
2024/05/15 17:12:36 released
2024/05/15 17:12:36 500 rows ingested
joel@Joels-MacBook-Pro-2 20240514-sf-ingest-dropped-rows % go run main.go
2024/05/15 17:12:45 retained
2024/05/15 17:12:49 released
2024/05/15 17:12:49 800 rows ingested

No code changes between those three runs. Also I ran the same ingestion with the postgres driver from python and the issue does not reproduce under any conditions. This seems to be specific to the snowflake driver itself.

@zeroshade
Copy link
Member

Awesome. So now we just gotta figure out if the issue is in the Parquet writer, or on snowflake's side :) if you don't get the time to dig deeper, I should be able to poke it tomorrow if you can post your repro

@joellubi
Copy link
Member

Sure @zeroshade, I ported the repro to a failing test case and pushed it up to #1866

@joellubi
Copy link
Member

joellubi commented May 16, 2024

Update on the investigation. I can force a total failure (0 rows ingested) every time by setting OptionStatementIngestWriterConcurrency to 1. If I set it to 2 then 2 files get uploaded, one of which contains about half the rows and the other contains 0.

When I download the parquet files from the stage in snowflake and read them locally, all rows are present in all files. In the test case I'm running, there is exactly one empty record batch. It seems that whichever parquet file that one get written to gets "tainted". Not sure exactly in what way yet, but some tools read the file just fine (e.g. duckdb) while in snowflake's case it parses 0 rows and doesn't report an error.

image

image

@zeroshade
Copy link
Member

@joellubi I did a similar investigation and proving it. It appears to be caused by the existence of a row group with 0 rows in the file. We can work around it by using WriteBuffered instead of Write in bulk_ingestion.go. That supports my hypothesis that it is caused by the existence of an empty row group in the file. We should make the change on our end to use WriteBuffered which shouldn't cause any significant difference in memory usage due to the file size limitations we use. But we should also bring this issue up with snowflake in the more general case.

zeroshade pushed a commit that referenced this issue May 21, 2024
…y batch is present (#1866)

Reproduces and fixes: #1847 

Parquet files with empty row groups are valid per the spec, but
Snowflake does not currently handle them properly. To mitigate this we
buffer writes to the parquet file so that a row group is not written
until some amount of data has been received.

The CheckedAllocator was enabled for all tests as part of this fix,
which detected a leak in the BufferWriter that was fixed in:
[https://github.com/apache/arrow/pull/41698](https://github.com/apache/arrow/pull/41698).

There was an unrelated test failure that surfaced once the
CheckedAllocator was enabled which had to do with casting decimals of
certain precision. The fix is included in this PR as well.
@joellubi
Copy link
Member

Thanks for merging @zeroshade. Any recommendations on the best way to inform Snowflake of the bug? It's not really related to any of their open source projects so a github issue doesn't seem appropriate.

@zeroshade
Copy link
Member

I'll reach out to the individuals I've been working with on ADBC stuff and bring it up to them. Thanks!

@zeroshade
Copy link
Member

Just wanted to follow up here, Snowflake is now aware of the issue and were able to replicate it. Hopefully they fix it soon.

zeroshade pushed a commit that referenced this issue Jul 29, 2024
…rgetSize on ingestion (#2026)

Fixes: #1997 

**Core Changes**
- Change ingestion `writeParquet` function to use unbuffered writer,
skipping 0-row records to avoid recurrence of #1847
- Use parquet writer's internal `RowGroupTotalBytesWritten()` method to
track output file size in favor of `limitWriter`
- Unit test to validate that file cutoff occurs precisely when expected

**Secondary Changes**
- Bump arrow dependency to `v18` to pull in the changes from
[ARROW-43326](apache/arrow#43326)
- Fix flightsql test that depends on hardcoded arrow version
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Type: bug Something isn't working
Projects
None yet
4 participants