Skip to content

Commit

Permalink
perf: Cache rolling groups (#20675)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 12, 2025
1 parent e42560d commit f020634
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 12 deletions.
16 changes: 9 additions & 7 deletions crates/polars-expr/src/expressions/rolling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ impl PhysicalExpr for RollingExpr {
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let groups_key = format!("{:?}", &self.options);

let groups_map = state.group_tuples.read().unwrap();
// Groups must be set by expression runner.
let groups = groups_map.get(&groups_key);
let groups = {
let groups_map = state.group_tuples.lock().unwrap();
// Groups must be set by expression runner.
groups_map.get(&groups_key).cloned()
};

// There can be multiple rolling expressions in a single expr.
// E.g. `min().rolling() + max().rolling()`
// So if we hit that we will compute them here.
let groups = match groups {
Some(groups) => Cow::Borrowed(groups),
Some(groups) => groups,
None => {
// We cannot cache those as mutexes under rayon can deadlock.
// TODO! precompute all groups up front.
let (_time_key, _keys, groups) = df.rolling(vec![], &self.options)?;
Cow::Owned(groups)
let mut groups_map = state.group_tuples.lock().unwrap();
groups_map.insert(groups_key, groups.clone());
groups
},
};

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-expr/src/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl PhysicalExpr for WindowExpr {
window_function_format_order_by(&mut cache_key, e, options)
}

let mut gt_map_guard = state.group_tuples.write().unwrap();
let mut gt_map_guard = state.group_tuples.lock().unwrap();
// we run sequential and partitioned
// and every partition run the cache should be empty so we expect a max of 1.
debug_assert!(gt_map_guard.len() <= 1);
Expand Down Expand Up @@ -683,7 +683,7 @@ fn materialize_column(join_opt_ids: &ChunkJoinOptIds, out_column: &Column) -> Co
fn cache_gb(gb: GroupBy, state: &ExecutionState, cache_key: &str) {
if state.cache_window() {
let groups = gb.take_groups();
let mut gt_map = state.group_tuples.write().unwrap();
let mut gt_map = state.group_tuples.lock().unwrap();
gt_map.insert(cache_key.to_string(), groups);
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-expr/src/state/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use polars_ops::prelude::ChunkJoinOptIds;
use super::NodeTimer;

pub type JoinTuplesCache = Arc<Mutex<PlHashMap<String, ChunkJoinOptIds>>>;
pub type GroupsTypeCache = Arc<RwLock<PlHashMap<String, GroupPositions>>>;
pub type GroupsTypeCache = Arc<Mutex<PlHashMap<String, GroupPositions>>>;

bitflags! {
#[repr(transparent)]
Expand Down Expand Up @@ -178,7 +178,7 @@ impl ExecutionState {
/// Clear the cache used by the Window expressions
pub fn clear_window_expr_cache(&self) {
{
let mut lock = self.group_tuples.write().unwrap();
let mut lock = self.group_tuples.lock().unwrap();
lock.clear();
}
let mut lock = self.join_tuples.lock().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/projection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn rolling_evaluate(
// Create a separate scope, so the lock is dropped, otherwise we deadlock when the
// rolling expression try to get read access.
{
let mut groups_map = state.group_tuples.write().unwrap();
let mut groups_map = state.group_tuples.lock().unwrap();
groups_map.insert(groups_key, groups);
}
partition
Expand Down

0 comments on commit f020634

Please sign in to comment.