Skip to content

Commit

Permalink
fix: Raise if apply returns different types (#20168)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Dec 5, 2024
1 parent 7116b72 commit dc54699
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ where
self.fast_explode = false;
}
let physical = s.to_physical_repr();
let ca = physical.unpack::<T>()?;
let ca = physical.unpack::<T>().map_err(|_| {
polars_err!(SchemaMismatch: "cannot build list with different dtypes
Expected {}, got {}.", self.field.dtype(), s.dtype())
})?;
let values = self.builder.mut_values();

ca.downcast_iter().for_each(|arr| {
Expand Down
73 changes: 55 additions & 18 deletions crates/polars-core/src/chunked_array/from_iterator_par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,38 +176,75 @@ fn materialize_list(
dtype: DataType,
value_capacity: usize,
list_capacity: usize,
) -> ListChunked {
) -> PolarsResult<ListChunked> {
let mut builder = get_list_builder(&dtype, value_capacity, list_capacity, name);
for v in vectors {
for val in v {
builder.append_opt_series(val.as_ref()).unwrap();
builder.append_opt_series(val.as_ref())?;
}
}
builder.finish()
Ok(builder.finish())
}

impl FromParallelIterator<Option<Series>> for ListChunked {
fn from_par_iter<I>(par_iter: I) -> Self
where
I: IntoParallelIterator<Item = Option<Series>>,
{
let vectors = collect_into_linked_list_vec(par_iter);
list_from_par_iter(par_iter, PlSmallStr::EMPTY).unwrap()
}
}

let list_capacity: usize = get_capacity_from_par_results(&vectors);
let value_capacity = get_value_cap(&vectors);
let dtype = get_dtype(&vectors);
if let DataType::Null = dtype {
ListChunked::full_null_with_dtype(PlSmallStr::EMPTY, list_capacity, &DataType::Null)
} else {
materialize_list(
PlSmallStr::EMPTY,
&vectors,
dtype,
value_capacity,
list_capacity,
)
pub fn list_from_par_iter<I>(par_iter: I, name: PlSmallStr) -> PolarsResult<ListChunked>
where
I: IntoParallelIterator<Item = Option<Series>>,
{
let vectors = collect_into_linked_list_vec(par_iter);

let list_capacity: usize = get_capacity_from_par_results(&vectors);
let value_capacity = get_value_cap(&vectors);
let dtype = get_dtype(&vectors);
if let DataType::Null = dtype {
Ok(ListChunked::full_null_with_dtype(
name,
list_capacity,
&DataType::Null,
))
} else {
materialize_list(name, &vectors, dtype, value_capacity, list_capacity)
}
}

pub fn try_list_from_par_iter<I>(par_iter: I, name: PlSmallStr) -> PolarsResult<ListChunked>
where
I: IntoParallelIterator<Item = PolarsResult<Option<Series>>>,
{
fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ {
move |item| match item {
Ok(item) => Some(item),
Err(error) => {
// We don't need a blocking `lock()`, as anybody
// else holding the lock will also be writing
// `Some(error)`, and then ours is irrelevant.
if let Ok(mut guard) = saved.try_lock() {
if guard.is_none() {
*guard = Some(error);
}
}
None
},
}
}

let saved_error = Mutex::new(None);
let iter = par_iter.into_par_iter().map(ok(&saved_error)).while_some();

let collection = list_from_par_iter(iter, name)?;

match saved_error.into_inner().unwrap() {
Some(error) => Err(error),
None => Ok(collection),
}
}

impl FromParIterWithDtype<Option<Series>> for ListChunked {
Expand All @@ -221,7 +258,7 @@ impl FromParIterWithDtype<Option<Series>> for ListChunked {
let list_capacity: usize = get_capacity_from_par_results(&vectors);
let value_capacity = get_value_cap(&vectors);
if let DataType::List(dtype) = dtype {
materialize_list(name, &vectors, *dtype, value_capacity, list_capacity)
materialize_list(name, &vectors, *dtype, value_capacity, list_capacity).unwrap()
} else {
panic!("expected list dtype")
}
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::borrow::Cow;
use std::sync::OnceLock;

use polars_core::chunked_array::builder::get_list_builder;
use polars_core::chunked_array::from_iterator_par::try_list_from_par_iter;
use polars_core::prelude::*;
use polars_core::POOL;
#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -181,7 +182,7 @@ impl ApplyExpr {

out
} else {
POOL.install(|| iter.collect::<PolarsResult<_>>())?
POOL.install(|| try_list_from_par_iter(iter, PlSmallStr::EMPTY))?
}
} else {
agg.list()
Expand Down
9 changes: 9 additions & 0 deletions py-polars/tests/unit/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,3 +726,12 @@ def test_raise_column_not_found_in_join_arg() -> None:
b = pl.DataFrame({"y": [1, 2, 3]})
with pytest.raises(pl.exceptions.ColumnNotFoundError):
a.join(b, on="y")


def test_raise_on_different_results_20104() -> None:
df = pl.DataFrame({"x": [1, 2]})

with pytest.raises(pl.exceptions.SchemaError):
df.rolling("x", period="3i").agg(
result=pl.col("x").gather_every(2, offset=1).map_batches(pl.Series.min)
)

0 comments on commit dc54699

Please sign in to comment.