diff --git a/crates/polars-core/src/chunked_array/builder/list/primitive.rs b/crates/polars-core/src/chunked_array/builder/list/primitive.rs index d9e74536bf13..5ae8a172d4c0 100644 --- a/crates/polars-core/src/chunked_array/builder/list/primitive.rs +++ b/crates/polars-core/src/chunked_array/builder/list/primitive.rs @@ -133,7 +133,11 @@ where self.fast_explode = false; } let physical = s.to_physical_repr(); - let ca = physical.unpack::()?; + let ca = physical.unpack::().map_err(|_| { + polars_err!(SchemaMismatch: "cannot build list with different dtypes + +Expected {}, got {}.", self.field.dtype(), s.dtype()) + })?; let values = self.builder.mut_values(); ca.downcast_iter().for_each(|arr| { diff --git a/crates/polars-core/src/chunked_array/from_iterator_par.rs b/crates/polars-core/src/chunked_array/from_iterator_par.rs index a90b27da5722..7e194252506b 100644 --- a/crates/polars-core/src/chunked_array/from_iterator_par.rs +++ b/crates/polars-core/src/chunked_array/from_iterator_par.rs @@ -176,14 +176,14 @@ fn materialize_list( dtype: DataType, value_capacity: usize, list_capacity: usize, -) -> ListChunked { +) -> PolarsResult { let mut builder = get_list_builder(&dtype, value_capacity, list_capacity, name); for v in vectors { for val in v { - builder.append_opt_series(val.as_ref()).unwrap(); + builder.append_opt_series(val.as_ref())?; } } - builder.finish() + Ok(builder.finish()) } impl FromParallelIterator> for ListChunked { @@ -191,23 +191,60 @@ impl FromParallelIterator> for ListChunked { where I: IntoParallelIterator>, { - let vectors = collect_into_linked_list_vec(par_iter); + list_from_par_iter(par_iter, PlSmallStr::EMPTY).unwrap() + } +} - let list_capacity: usize = get_capacity_from_par_results(&vectors); - let value_capacity = get_value_cap(&vectors); - let dtype = get_dtype(&vectors); - if let DataType::Null = dtype { - ListChunked::full_null_with_dtype(PlSmallStr::EMPTY, list_capacity, &DataType::Null) - } else { - materialize_list( - PlSmallStr::EMPTY, - &vectors, - dtype, - value_capacity, - list_capacity, - ) +pub fn list_from_par_iter(par_iter: I, name: PlSmallStr) -> PolarsResult +where + I: IntoParallelIterator>, +{ + let vectors = collect_into_linked_list_vec(par_iter); + + let list_capacity: usize = get_capacity_from_par_results(&vectors); + let value_capacity = get_value_cap(&vectors); + let dtype = get_dtype(&vectors); + if let DataType::Null = dtype { + Ok(ListChunked::full_null_with_dtype( + name, + list_capacity, + &DataType::Null, + )) + } else { + materialize_list(name, &vectors, dtype, value_capacity, list_capacity) + } +} + +pub fn try_list_from_par_iter(par_iter: I, name: PlSmallStr) -> PolarsResult +where + I: IntoParallelIterator>>, +{ + fn ok(saved: &Mutex>) -> impl Fn(Result) -> Option + '_ { + move |item| match item { + Ok(item) => Some(item), + Err(error) => { + // We don't need a blocking `lock()`, as anybody + // else holding the lock will also be writing + // `Some(error)`, and then ours is irrelevant. + if let Ok(mut guard) = saved.try_lock() { + if guard.is_none() { + *guard = Some(error); + } + } + None + }, } } + + let saved_error = Mutex::new(None); + let iter = par_iter.into_par_iter().map(ok(&saved_error)).while_some(); + + let collection = list_from_par_iter(iter, name)?; + + match saved_error.into_inner().unwrap() { + Some(error) => Err(error), + None => Ok(collection), + } } impl FromParIterWithDtype> for ListChunked { @@ -221,7 +258,7 @@ impl FromParIterWithDtype> for ListChunked { let list_capacity: usize = get_capacity_from_par_results(&vectors); let value_capacity = get_value_cap(&vectors); if let DataType::List(dtype) = dtype { - materialize_list(name, &vectors, *dtype, value_capacity, list_capacity) + materialize_list(name, &vectors, *dtype, value_capacity, list_capacity).unwrap() } else { panic!("expected list dtype") } diff --git a/crates/polars-expr/src/expressions/apply.rs b/crates/polars-expr/src/expressions/apply.rs index f407323ce2a3..e6c5ae856ae7 100644 --- a/crates/polars-expr/src/expressions/apply.rs +++ b/crates/polars-expr/src/expressions/apply.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::sync::OnceLock; use polars_core::chunked_array::builder::get_list_builder; +use polars_core::chunked_array::from_iterator_par::try_list_from_par_iter; use polars_core::prelude::*; use polars_core::POOL; #[cfg(feature = "parquet")] @@ -181,7 +182,7 @@ impl ApplyExpr { out } else { - POOL.install(|| iter.collect::>())? + POOL.install(|| try_list_from_par_iter(iter, PlSmallStr::EMPTY))? } } else { agg.list() diff --git a/py-polars/tests/unit/test_errors.py b/py-polars/tests/unit/test_errors.py index 7bb1b40ce02e..79ed64dc88ac 100644 --- a/py-polars/tests/unit/test_errors.py +++ b/py-polars/tests/unit/test_errors.py @@ -726,3 +726,12 @@ def test_raise_column_not_found_in_join_arg() -> None: b = pl.DataFrame({"y": [1, 2, 3]}) with pytest.raises(pl.exceptions.ColumnNotFoundError): a.join(b, on="y") + + +def test_raise_on_different_results_20104() -> None: + df = pl.DataFrame({"x": [1, 2]}) + + with pytest.raises(pl.exceptions.SchemaError): + df.rolling("x", period="3i").agg( + result=pl.col("x").gather_every(2, offset=1).map_batches(pl.Series.min) + )