Skip to content

Commit

Permalink
feat: Allow sorting of lists and arrays
Browse files Browse the repository at this point in the history
Fixes #17228.
  • Loading branch information
coastalwhite committed Dec 5, 2024
1 parent bdf4512 commit 60c6922
Show file tree
Hide file tree
Showing 21 changed files with 163 additions and 139 deletions.
120 changes: 32 additions & 88 deletions crates/polars-core/src/chunked_array/ops/row_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,11 @@ use crate::prelude::*;
use crate::utils::_split_offsets;
use crate::POOL;

pub(crate) fn convert_series_for_row_encoding(s: &Series) -> PolarsResult<Series> {
use DataType::*;
let out = match s.dtype() {
#[cfg(feature = "dtype-categorical")]
Categorical(_, _) | Enum(_, _) => s.rechunk(),
Binary | Boolean => s.clone(),
BinaryOffset => s.clone(),
String => s.clone(),
#[cfg(feature = "dtype-struct")]
Struct(_) => {
let ca = s.struct_().unwrap();
let new_fields = ca
.fields_as_series()
.iter()
.map(convert_series_for_row_encoding)
.collect::<PolarsResult<Vec<_>>>()?;
let mut out =
StructChunked::from_series(ca.name().clone(), ca.len(), new_fields.iter())?;
out.zip_outer_validity(ca);
out.into_series()
},
// we could fallback to default branch, but decimal is not numeric dtype for now, so explicit here
#[cfg(feature = "dtype-decimal")]
Decimal(_, _) => s.clone(),
List(inner) if !inner.is_nested() => s.clone(),
Null => s.clone(),
_ => {
let phys = s.to_physical_repr().into_owned();
polars_ensure!(
phys.dtype().is_numeric(),
InvalidOperation: "cannot sort column of dtype `{}`", s.dtype()
);
phys
},
};
Ok(out)
}

pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult<ArrayRef> {
let by = convert_series_for_row_encoding(by)?;
pub fn _get_rows_encoded_compat_array(by: &Series) -> ArrayRef {
let by = by.to_physical_repr();
let by = by.rechunk();

let out = match by.dtype() {
match by.dtype() {
#[cfg(feature = "dtype-categorical")]
DataType::Categorical(_, _) | DataType::Enum(_, _) => {
let ca = by.categorical().unwrap();
Expand All @@ -60,11 +22,10 @@ pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult<ArrayRef> {
},
// Take physical
_ => by.chunks()[0].clone(),
};
Ok(out)
}
}

pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> BinaryOffsetChunked {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);
Expand All @@ -74,21 +35,16 @@ pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> PolarsResult<BinaryO
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;
Ok(rows.into_array())
let rows = _get_rows_encoded_unordered(&sliced);
rows.into_array()
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
let chunks = POOL.install(|| chunks.collect::<Vec<_>>());

Ok(BinaryOffsetChunked::from_chunk_iter(
PlSmallStr::EMPTY,
chunks?,
))
BinaryOffsetChunked::from_chunk_iter(PlSmallStr::EMPTY, chunks)
}

// Almost the same but broadcast nulls to the row-encoded array.
pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
pub fn encode_rows_vertical_par_unordered_broadcast_nulls(by: &[Series]) -> BinaryOffsetChunked {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);
Expand All @@ -98,7 +54,7 @@ pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;
let rows = _get_rows_encoded_unordered(&sliced);

let validities = sliced
.iter()
Expand All @@ -113,14 +69,11 @@ pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
.collect::<Vec<_>>();

let validity = combine_validities_and_many(&validities);
Ok(rows.into_array().with_validity_typed(validity))
rows.into_array().with_validity_typed(validity)
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
let chunks = POOL.install(|| chunks.collect::<Vec<_>>());

Ok(BinaryOffsetChunked::from_chunk_iter(
PlSmallStr::EMPTY,
chunks?,
))
BinaryOffsetChunked::from_chunk_iter(PlSmallStr::EMPTY, chunks)
}

pub fn get_row_encoding_dictionary(dtype: &DataType) -> Option<RowEncodingCatOrder> {
Expand Down Expand Up @@ -202,15 +155,12 @@ pub fn get_row_encoding_dictionary(dtype: &DataType) -> Option<RowEncodingCatOrd
}
}

pub fn encode_rows_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let rows = _get_rows_encoded_unordered(by)?;
Ok(BinaryOffsetChunked::with_chunk(
PlSmallStr::EMPTY,
rows.into_array(),
))
pub fn encode_rows_unordered(by: &[Series]) -> BinaryOffsetChunked {
let rows = _get_rows_encoded_unordered(by);
BinaryOffsetChunked::with_chunk(PlSmallStr::EMPTY, rows.into_array())
}

pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
pub fn _get_rows_encoded_unordered(by: &[Series]) -> RowsEncoded {
let mut cols = Vec::with_capacity(by.len());
let mut opts = Vec::with_capacity(by.len());
let mut dicts = Vec::with_capacity(by.len());
Expand All @@ -222,22 +172,18 @@ pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
for by in by {
debug_assert_eq!(by.len(), num_rows);

let arr = _get_rows_encoded_compat_array(by)?;
let arr = _get_rows_encoded_compat_array(by);
let opt = RowEncodingOptions::new_unsorted();
let dict = get_row_encoding_dictionary(by.dtype());

cols.push(arr);
opts.push(opt);
dicts.push(dict);
}
Ok(convert_columns(num_rows, &cols, &opts, &dicts))
convert_columns(num_rows, &cols, &opts, &dicts)
}

pub fn _get_rows_encoded(
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<RowsEncoded> {
pub fn _get_rows_encoded(by: &[Column], descending: &[bool], nulls_last: &[bool]) -> RowsEncoded {
debug_assert_eq!(by.len(), descending.len());
debug_assert_eq!(by.len(), nulls_last.len());

Expand All @@ -253,39 +199,37 @@ pub fn _get_rows_encoded(
debug_assert_eq!(by.len(), num_rows);

let by = by.as_materialized_series();
let arr = _get_rows_encoded_compat_array(by)?;
let arr = _get_rows_encoded_compat_array(by);
let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
let dict = get_row_encoding_dictionary(by.dtype());

cols.push(arr);
opts.push(opt);
dicts.push(dict);
}
Ok(convert_columns(num_rows, &cols, &opts, &dicts))
convert_columns(num_rows, &cols, &opts, &dicts)
}

pub fn _get_rows_encoded_ca(
name: PlSmallStr,
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded(by, descending, nulls_last)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
) -> BinaryOffsetChunked {
BinaryOffsetChunked::with_chunk(
name,
_get_rows_encoded(by, descending, nulls_last).into_array(),
)
}

pub fn _get_rows_encoded_arr(
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<BinaryArray<i64>> {
_get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
) -> BinaryArray<i64> {
_get_rows_encoded(by, descending, nulls_last).into_array()
}

pub fn _get_rows_encoded_ca_unordered(
name: PlSmallStr,
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded_unordered(by)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
pub fn _get_rows_encoded_ca_unordered(name: PlSmallStr, by: &[Series]) -> BinaryOffsetChunked {
BinaryOffsetChunked::with_chunk(name, _get_rows_encoded_unordered(by).into_array())
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn _arg_bottom_k(
by_column,
&sort_options.descending,
&sort_options.nulls_last,
)?;
);
let arr = encoded.into_array();
let mut rows = arr
.values_iter()
Expand Down
22 changes: 22 additions & 0 deletions crates/polars-core/src/chunked_array/ops/sort/arg_sort.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use polars_utils::itertools::Itertools;

use self::row_encode::_get_rows_encoded;
use super::*;

// Reduce monomorphisation.
Expand Down Expand Up @@ -149,3 +152,22 @@ where

ChunkedArray::with_chunk(name, IdxArr::from_data_default(Buffer::from(idx), None))
}

pub(crate) fn arg_sort_row_fmt(
by: &[Column],
descending: bool,
nulls_last: bool,
parallel: bool,
) -> IdxCa {
let rows_encoded = _get_rows_encoded(by, &[descending], &[nulls_last]);
let mut items: Vec<_> = rows_encoded.iter().enumerate_idx().collect();

if parallel {
POOL.install(|| items.par_sort_by(|a, b| a.1.cmp(b.1)));
} else {
items.sort_by(|a, b| a.1.cmp(b.1));
}

let ca: NoNull<IdxCa> = items.into_iter().map(|tpl| tpl.0).collect();
ca.into_inner()
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ pub(crate) fn argsort_multiple_row_fmt(
mut descending: Vec<bool>,
mut nulls_last: Vec<bool>,
parallel: bool,
) -> PolarsResult<IdxCa> {
) -> IdxCa {
_broadcast_bools(by.len(), &mut descending);
_broadcast_bools(by.len(), &mut nulls_last);

let rows_encoded = _get_rows_encoded(by, &descending, &nulls_last)?;
let rows_encoded = _get_rows_encoded(by, &descending, &nulls_last);
let mut items: Vec<_> = rows_encoded.iter().enumerate_idx().collect();

if parallel {
Expand All @@ -103,5 +103,5 @@ pub(crate) fn argsort_multiple_row_fmt(
}

let ca: NoNull<IdxCa> = items.into_iter().map(|tpl| tpl.0).collect();
Ok(ca.into_inner())
ca.into_inner()
}
20 changes: 8 additions & 12 deletions crates/polars-core/src/chunked_array/ops/sort/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod categorical;

use std::cmp::Ordering;

pub(crate) use arg_sort::arg_sort_row_fmt;
pub(crate) use arg_sort_multiple::argsort_multiple_row_fmt;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::buffer::Buffer;
Expand All @@ -18,9 +19,7 @@ use compare_inner::NonNull;
use rayon::prelude::*;
pub use slice::*;

use crate::chunked_array::ops::row_encode::{
_get_rows_encoded_ca, convert_series_for_row_encoding,
};
use crate::chunked_array::ops::row_encode::_get_rows_encoded_ca;
use crate::prelude::compare_inner::TotalOrdInner;
use crate::prelude::sort::arg_sort_multiple::*;
use crate::prelude::*;
Expand Down Expand Up @@ -608,8 +607,7 @@ impl StructChunked {
&[self.clone().into_column()],
&[options.descending],
&[options.nulls_last],
)
.unwrap();
);
bin.arg_sort(Default::default())
}
}
Expand All @@ -627,7 +625,7 @@ impl ChunkSort<StructType> for StructChunked {
}

fn arg_sort(&self, options: SortOptions) -> IdxCa {
let bin = self.get_row_encoded(options).unwrap();
let bin = self.get_row_encoded(options);
bin.arg_sort(Default::default())
}
}
Expand Down Expand Up @@ -726,21 +724,19 @@ pub fn _broadcast_bools(n_cols: usize, values: &mut Vec<bool>) {
pub(crate) fn prepare_arg_sort(
columns: Vec<Column>,
sort_options: &mut SortMultipleOptions,
) -> PolarsResult<(Column, Vec<Column>)> {
) -> (Column, Vec<Column>) {
let n_cols = columns.len();

let mut columns = columns
.iter()
.map(Column::as_materialized_series)
.map(convert_series_for_row_encoding)
.map(|s| s.map(Column::from))
.collect::<PolarsResult<Vec<_>>>()?;
.map(Column::to_physical_repr)
.collect::<Vec<_>>();

_broadcast_bools(n_cols, &mut sort_options.descending);
_broadcast_bools(n_cols, &mut sort_options.nulls_last);

let first = columns.remove(0);
Ok((first, columns))
(first, columns)
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-core/src/chunked_array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,12 @@ impl StructChunked {
})
}

pub fn get_row_encoded_array(&self, options: SortOptions) -> PolarsResult<BinaryArray<i64>> {
pub fn get_row_encoded_array(&self, options: SortOptions) -> BinaryArray<i64> {
let c = self.clone().into_column();
_get_rows_encoded_arr(&[c], &[options.descending], &[options.nulls_last])
}

pub fn get_row_encoded(&self, options: SortOptions) -> PolarsResult<BinaryOffsetChunked> {
pub fn get_row_encoded(&self, options: SortOptions) -> BinaryOffsetChunked {
let c = self.clone().into_column();
_get_rows_encoded_ca(
self.name().clone(),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-core/src/frame/group_by/into_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ impl IntoGroupsProxy for ListChunked {
multithreaded &= POOL.current_num_threads() > 1;
let by = &[self.clone().into_series()];
let ca = if multithreaded {
encode_rows_vertical_par_unordered(by).unwrap()
encode_rows_vertical_par_unordered(by)
} else {
_get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, by).unwrap()
_get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, by)
};

ca.group_tuples(multithreaded, sorted)
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/group_by/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl DataFrame {
encode_rows_vertical_par_unordered(&by)
} else {
encode_rows_unordered(&by)
}?
}
.into_series();
rows.group_tuples(multithreaded, sorted)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2077,9 +2077,9 @@ impl DataFrame {
sort_options.descending,
sort_options.nulls_last,
sort_options.multithreaded,
)?
)
} else {
let (first, other) = prepare_arg_sort(by_column, &mut sort_options)?;
let (first, other) = prepare_arg_sort(by_column, &mut sort_options);
first
.as_materialized_series()
.arg_sort_multiple(&other, &sort_options)?
Expand Down
Loading

0 comments on commit 60c6922

Please sign in to comment.