Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.14.0 Regression] scan_delta on empty partitioned tables fails #19854

Closed
2 tasks done
TinoSM opened this issue Nov 18, 2024 · 7 comments · Fixed by #19884
Closed
2 tasks done

[1.14.0 Regression] scan_delta on empty partitioned tables fails #19854

TinoSM opened this issue Nov 18, 2024 · 7 comments · Fixed by #19884
Assignees
Labels
accepted Ready for implementation bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@TinoSM
Copy link

TinoSM commented Nov 18, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import polars as pl

# Download https://github.com/user-attachments/files/17802414/test_table.zip and unzip it

#Old 1.13.0 behaviour
pl.scan_delta("./test_table", use_pyarrow=True).collect()
pl.scan_delta("./test_table", use_pyarrow=True).filter(pl.col("active")==1).collect()


pl.scan_delta("./test_table").collect()
#pl.scan_delta("./test_table").filter(pl.col("active")==1).collect()

test_table.zip

Log output

With 1.14.0 

>>> pl.scan_delta("/Users/xx/Downloads/test_table", use_pyarrow=False).filter(pl.col("active")==1).collect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/xx/Projects/gdt-opendata/.venv/lib/python3.11/site-packages/polars/lazyframe/frame.py", line 2029, in collect
    return wrap_df(ldf.collect(callback))
                   ^^^^^^^^^^^^^^^^^^^^^
polars.exceptions.ColumnNotFoundError: unable to find column "active"; valid columns: ["id"]

or even worse if I remove the filter active

pyo3_runtime.PanicException: called `Option::unwrap()` on a `None` value

With polars 1.13.1

>>> pl.scan_delta("./test_table").collect()
shape: (0, 2)
┌────────┬─────┐
│ active ┆ id  │
│ ---    ┆ --- │
│ i64    ┆ str │
╞════════╪═════╡
└────────┴─────┘

Issue description

I come from here #19103
New changes to delta reader broke reading on empty tables (i.e. most of my tests :) )

Expected behavior

Partition filter working as expected

Installed versions

--------Version info---------
Polars:              1.14.0
Index type:          UInt32
Platform:            macOS-14.7.1-arm64-arm-64bit
Python:              3.11.10 (main, Sep  7 2024, 01:03:31) [Clang 15.0.0 (clang-1500.3.9.4)]
LTS CPU:             False

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               5.4.1
boto3                1.35.36
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            0.21.0
fastexcel            <not installed>
fsspec               2024.10.0
gevent               <not installed>
google.auth          2.36.0
great_tables         <not installed>
matplotlib           <not installed>
nest_asyncio         <not installed>
numpy                1.26.4
openpyxl             <not installed>
pandas               2.2.3
pyarrow              18.0.0
pydantic             2.9.2
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>
@TinoSM TinoSM added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Nov 18, 2024
@TinoSM
Copy link
Author

TinoSM commented Nov 18, 2024

Btw I was having issues with delta-rs scanner and moving to the new version of the reader fixes them, so for those interested, this serve as a DIRTY workaround meanwhile ;_; :

    # TODO: this is just a simple read, doing the if/else because of
    # https://github.com/pola-rs/polars/issues/19854
    if "active" in pl.scan_delta(
        table_path,
        use_pyarrow=False
    ).collect_schema():
        return pl.scan_delta(
            table_path,
            use_pyarrow=False
        ).filter(pl.col("active")==1)
    else:
        return pl.scan_delta(
            table_path,
            pyarrow_options={"partitions": [(R.active, "=", str(1))]},
            use_pyarrow=True
        )

@AdrienDart
Copy link

Hi, I have an issue that is likely related: filtering after scan_delta gets me a ShapeError with polars 1.14.0 and not polars 1.13.1

@TinoSM
Copy link
Author

TinoSM commented Nov 19, 2024

Hi, I have an issue that is likely related: filtering after scan_delta gets me a ShapeError with polars 1.14.0 and not polars 1.13.1

did you try with use_pyarrow=True? that's likely the old behaviour, I changed that in my code to rollback to 1.13.1 behaviour for now

@AdrienDart
Copy link

That solves the issue! Thank you for your help!

@ion-elgreco
Copy link
Contributor

@AdrienDart you should still create an issue, it might be a different thing

@TinoSM
Copy link
Author

TinoSM commented Nov 20, 2024

I was having some issues with delta reader and schema evolution (as of now if a parquet file has a nested struct and a new field in the struct gets added ONLY TO THE delta schema, i.e. no parquet writes are done).

The schema won't be found.

I modified the scan_delta function (locally / in my wrappers). This fixes two issues:

  1. The issue with empty tables not fully complying to the schema + also parquet files not FULLY complying to the delta schema
  2. Issues with fields missing in nested structs of the parquet files, but added to the delta schema, now they will succeed
  3. I'm not sure if diagonal_relaxed is what you want as "polars" (it works for me locally of course), as it might have side-effects on data sizes... they should be minor though, as in theory the parquet files within a delta table should be compatible with the schema

I don't have time to PR it as of now (trying to fix my things locally...) and I'm using it in my "architecture-wrapper", but if someone wants to propose it as a fix before I have time... feel free to use the code

This is the "Last part" of scan_delta within "delta.py" code of 1.14.0 polars as seen here
https://github.com/TinoSM/polars/blob/main/py-polars/polars/io/delta.py

    # Requires conversion through pyarrow table because there is no direct way yet to
    # convert a delta schema into a polars schema
    delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True)
    empty_delta_schema_lf : pl.LazyFrame = from_arrow(pa.Table.from_pylist([], delta_schema)).lazy() # type: ignore
    polars_schema = empty_delta_schema_lf.collect_schema()  # type: ignore[union-attr]
    partition_columns = dl_tbl.metadata().partition_columns

    def _split_schema(
        schema: Schema, partition_columns: list[str]
    ) -> Schema:
        if len(partition_columns) == 0:
            return  Schema([])
        hive_schema = []

        for name, dtype in schema.items():
            if name in partition_columns:
                hive_schema.append((name, dtype))
        return Schema(hive_schema)

    # Required because main_schema cannot contain hive columns currently
    hive_schema = _split_schema(polars_schema, partition_columns)

    if dl_tbl.file_uris():
        parquet_df= scan_parquet(
            dl_tbl.file_uris(),
            schema=None,
            hive_schema=hive_schema,
            allow_missing_columns=True,
            hive_partitioning=len(partition_columns) > 0,
            storage_options=storage_options,
            rechunk=rechunk or False,
        )
         # should we rechunk in concat?
        return concat([empty_delta_schema_lf, parquet_df], how="diagonal_relaxed")
    else:
        return empty_delta_schema_lf

TinoSM added a commit to TinoSM/polars that referenced this issue Nov 20, 2024
Fixes pola-rs#19854

Fixes:
1. The issue with empty tables not fully complying to the schema + also parquet files not FULLY complying to the delta schema
2. Issues with fields missing in nested structs of the parquet files, but added to the delta schema, now they will succeed

Questions:
3. I'm not sure if diagonal_relaxed is what you want as "polars" (it works for me locally of course), as it might have side-effects on data sizes... they should be minor though, as in theory the parquet files within a delta table should be compatible with the schema
4. Should we rechunk also in the concat? I think its not needed as its just an empty_dataframe + the real already-rechunked dataframe
@AdrienDart
Copy link

@ion-elgreco I would like to but I'm currently failing to create a 'minimal reproducible code' with a simple example.
The code looks like this and does not fail with use_pyarrow=True

(
    pl.concat(
        [pl.scan_delta('a'),
        pl.scan_delta('b')]
    ).filter(col('foo).str.contains('xyz')
    ).unique(['bar', 'goo']
    ).head().collect()
)

and I get ShapeError('unable to vstack, columns names don't match: ... )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
Archived in project
5 participants