diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index e7b1a75086d4..d0357fb12b38 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -266,7 +266,7 @@ message PhysicalPlanNode { ProjectionExecNode projection = 4; GlobalLimitExecNode global_limit = 6; LocalLimitExecNode local_limit = 7; - HashAggregateExecNode hash_aggregate = 8; + AggregateExecNode aggregate = 8; HashJoinExecNode hash_join = 9; ShuffleReaderExecNode shuffle_reader = 10; SortExecNode sort = 11; @@ -519,7 +519,7 @@ message WindowAggExecNode { datafusion.Schema input_schema = 4; } -message HashAggregateExecNode { +message AggregateExecNode { repeated PhysicalExprNode group_expr = 1; repeated PhysicalExprNode aggr_expr = 2; AggregateMode mode = 3; diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index ed268820f394..f5b495b67f1d 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -28,7 +28,8 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::window_frames::WindowFrame; use datafusion::logical_plan::FunctionRegistry; -use datafusion::physical_plan::aggregates::create_aggregate_expr; +use datafusion::physical_plan::aggregates::AggregateExec; +use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateMode}; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::cross_join::CrossJoinExec; @@ -39,7 +40,6 @@ use datafusion::physical_plan::file_format::{ AvroExec, CsvExec, FileScanConfig, ParquetExec, }; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion::physical_plan::projection::ProjectionExec; @@ -306,19 +306,21 @@ impl AsExecutionPlan for PhysicalPlanNode { Arc::new((&input_schema).try_into()?), )?)) } - PhysicalPlanType::HashAggregate(hash_agg) => { + PhysicalPlanType::Aggregate(hash_agg) => { let input: Arc = into_physical_plan!( hash_agg.input, registry, runtime, extension_codec )?; - let mode = protobuf::AggregateMode::from_i32(hash_agg.mode).ok_or_else(|| { - proto_error(format!( - "Received a HashAggregateNode message with unknown AggregateMode {}", + let mode = protobuf::AggregateMode::from_i32(hash_agg.mode).ok_or_else( + || { + proto_error(format!( + "Received a AggregateNode message with unknown AggregateMode {}", hash_agg.mode )) - })?; + }, + )?; let agg_mode: AggregateMode = match mode { protobuf::AggregateMode::Partial => AggregateMode::Partial, protobuf::AggregateMode::Final => AggregateMode::Final, @@ -341,7 +343,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .as_ref() .ok_or_else(|| { BallistaError::General( - "input_schema in HashAggregateNode is missing.".to_owned(), + "input_schema in AggregateNode is missing.".to_owned(), ) })? .clone(); @@ -384,14 +386,14 @@ impl AsExecutionPlan for PhysicalPlanNode { )?) } _ => Err(BallistaError::General( - "Invalid aggregate expression for HashAggregateExec" + "Invalid aggregate expression for AggregateExec" .to_string(), )), } }) .collect::, _>>()?; - Ok(Arc::new(HashAggregateExec::try_new( + Ok(Arc::new(AggregateExec::try_new( agg_mode, group, physical_aggr_expr, @@ -730,7 +732,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }, ))), }) - } else if let Some(exec) = plan.downcast_ref::() { + } else if let Some(exec) = plan.downcast_ref::() { let groups = exec .group_expr() .iter() @@ -768,8 +770,8 @@ impl AsExecutionPlan for PhysicalPlanNode { extension_codec, )?; Ok(protobuf::PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::HashAggregate(Box::new( - protobuf::HashAggregateExecNode { + physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new( + protobuf::AggregateExecNode { group_expr: groups, group_expr_name: group_names, aggr_expr: agg, @@ -1080,12 +1082,12 @@ mod roundtrip_tests { datasource::listing::PartitionedFile, logical_plan::{JoinType, Operator}, physical_plan::{ + aggregates::{AggregateExec, AggregateMode}, empty::EmptyExec, expressions::{binary, col, lit, InListExpr, NotExpr}, expressions::{Avg, Column, PhysicalSortExpr}, file_format::{FileScanConfig, ParquetExec}, filter::FilterExec, - hash_aggregate::{AggregateMode, HashAggregateExec}, hash_join::{HashJoinExec, PartitionMode}, limit::{GlobalLimitExec, LocalLimitExec}, sorts::sort::SortExec, @@ -1212,7 +1214,7 @@ mod roundtrip_tests { } #[test] - fn rountrip_hash_aggregate() -> Result<()> { + fn rountrip_aggregate() -> Result<()> { let field_a = Field::new("a", DataType::Int64, false); let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); @@ -1226,7 +1228,7 @@ mod roundtrip_tests { DataType::Float64, ))]; - roundtrip_test(Arc::new(HashAggregateExec::try_new( + roundtrip_test(Arc::new(AggregateExec::try_new( AggregateMode::Final, groups.clone(), aggregates.clone(), diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 6670ab5cedd8..85a557e437ae 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -48,9 +48,9 @@ use datafusion::physical_plan::common::batch_byte_size; use datafusion::physical_plan::empty::EmptyExec; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::physical_plan::aggregates::AggregateExec; use datafusion::physical_plan::file_format::{CsvExec, ParquetExec}; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::hash_join::HashJoinExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sorts::sort::SortExec; @@ -151,8 +151,8 @@ fn build_exec_plan_diagram( id: &mut AtomicUsize, draw_entity: bool, ) -> Result { - let operator_str = if plan.as_any().downcast_ref::().is_some() { - "HashAggregateExec" + let operator_str = if plan.as_any().downcast_ref::().is_some() { + "AggregateExec" } else if plan.as_any().downcast_ref::().is_some() { "SortExec" } else if plan.as_any().downcast_ref::().is_some() { diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 8198c4ed27c0..e855f0796ada 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -276,8 +276,8 @@ mod test { use ballista_core::error::BallistaError; use ballista_core::execution_plans::UnresolvedShuffleExec; use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec}; + use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; - use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::HashJoinExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{ @@ -298,7 +298,7 @@ mod test { } #[tokio::test] - async fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> { + async fn distributed_aggregate_plan() -> Result<(), BallistaError> { let ctx = datafusion_test_context("testdata").await?; // simplified form of TPC-H query 1 @@ -327,12 +327,12 @@ mod test { /* Expected result: ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }], 2)) - HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] + AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false ShuffleWriterExec: None ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price] - HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] + AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] CoalesceBatchesExec: target_batch_size=4096 UnresolvedShuffleExec @@ -346,14 +346,14 @@ mod test { // verify stage 0 let stage0 = stages[0].children()[0].clone(); - let partial_hash = downcast_exec!(stage0, HashAggregateExec); + let partial_hash = downcast_exec!(stage0, AggregateExec); assert!(*partial_hash.mode() == AggregateMode::Partial); // verify stage 1 let stage1 = stages[1].children()[0].clone(); let projection = downcast_exec!(stage1, ProjectionExec); let final_hash = projection.children()[0].clone(); - let final_hash = downcast_exec!(final_hash, HashAggregateExec); + let final_hash = downcast_exec!(final_hash, AggregateExec); assert!(*final_hash.mode() == AggregateMode::FinalPartitioned); let coalesce = final_hash.children()[0].clone(); let coalesce = downcast_exec!(coalesce, CoalesceBatchesExec); @@ -449,7 +449,7 @@ order by CsvExec: source=Path(testdata/orders: [testdata/orders/orders.tbl]), has_header=false ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0 }], 2)) - HashAggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] + AggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] CoalesceBatchesExec: target_batch_size=4096 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })] CoalesceBatchesExec: target_batch_size=4096 @@ -459,7 +459,7 @@ order by ShuffleWriterExec: None ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count] - HashAggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] + AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] CoalesceBatchesExec: target_batch_size=4096 UnresolvedShuffleExec @@ -514,7 +514,7 @@ order by .partition_count() ); - let hash_agg = downcast_exec!(input, HashAggregateExec); + let hash_agg = downcast_exec!(input, AggregateExec); let coalesce_batches = hash_agg.children()[0].clone(); let coalesce_batches = downcast_exec!(coalesce_batches, CoalesceBatchesExec); @@ -560,7 +560,7 @@ order by } #[tokio::test] - async fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> { + async fn roundtrip_serde_aggregate() -> Result<(), BallistaError> { let ctx = datafusion_test_context("testdata").await?; // simplified form of TPC-H query 1 @@ -586,8 +586,8 @@ order by let partial_hash = stages[0].children()[0].clone(); let partial_hash_serde = roundtrip_operator(partial_hash.clone())?; - let partial_hash = downcast_exec!(partial_hash, HashAggregateExec); - let partial_hash_serde = downcast_exec!(partial_hash_serde, HashAggregateExec); + let partial_hash = downcast_exec!(partial_hash, AggregateExec); + let partial_hash_serde = downcast_exec!(partial_hash_serde, AggregateExec); assert_eq!( format!("{:?}", partial_hash), diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index fd08d3a0a2e7..055a17f4e907 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -158,7 +158,7 @@ //! //! * Projection: [`ProjectionExec`](physical_plan::projection::ProjectionExec) //! * Filter: [`FilterExec`](physical_plan::filter::FilterExec) -//! * Hash and Grouped aggregations: [`HashAggregateExec`](physical_plan::hash_aggregate::HashAggregateExec) +//! * Grouped and non-grouped aggregations: [`AggregateExec`](physical_plan::aggregates::AggregateExec) //! * Sort: [`SortExec`](physical_plan::sorts::sort::SortExec) //! * Coalesce partitions: [`CoalescePartitionsExec`](physical_plan::coalesce_partitions::CoalescePartitionsExec) //! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and [`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 9af053f934fb..9c548ab9b5fb 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -21,8 +21,8 @@ use std::sync::Arc; use arrow::datatypes::Schema; use crate::execution::context::SessionConfig; +use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::empty::EmptyExec; -use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::{ expressions, AggregateExpr, ColumnStatistics, ExecutionPlan, Statistics, @@ -53,8 +53,8 @@ impl PhysicalOptimizerRule for AggregateStatistics { if let Some(partial_agg_exec) = take_optimizable(&*plan) { let partial_agg_exec = partial_agg_exec .as_any() - .downcast_ref::() - .expect("take_optimizable() ensures that this is a HashAggregateExec"); + .downcast_ref::() + .expect("take_optimizable() ensures that this is a AggregateExec"); let stats = partial_agg_exec.input().statistics(); let mut projections = vec![]; for expr in partial_agg_exec.aggr_expr() { @@ -96,22 +96,22 @@ impl PhysicalOptimizerRule for AggregateStatistics { } } -/// assert if the node passed as argument is a final `HashAggregateExec` node that can be optimized: -/// - its child (with posssible intermediate layers) is a partial `HashAggregateExec` node +/// assert if the node passed as argument is a final `AggregateExec` node that can be optimized: +/// - its child (with posssible intermediate layers) is a partial `AggregateExec` node /// - they both have no grouping expression /// - the statistics are exact -/// If this is the case, return a ref to the partial `HashAggregateExec`, else `None`. -/// We would have prefered to return a casted ref to HashAggregateExec but the recursion requires +/// If this is the case, return a ref to the partial `AggregateExec`, else `None`. +/// We would have prefered to return a casted ref to AggregateExec but the recursion requires /// the `ExecutionPlan.children()` method that returns an owned reference. fn take_optimizable(node: &dyn ExecutionPlan) -> Option> { - if let Some(final_agg_exec) = node.as_any().downcast_ref::() { + if let Some(final_agg_exec) = node.as_any().downcast_ref::() { if final_agg_exec.mode() == &AggregateMode::Final && final_agg_exec.group_expr().is_empty() { let mut child = Arc::clone(final_agg_exec.input()); loop { if let Some(partial_agg_exec) = - child.as_any().downcast_ref::() + child.as_any().downcast_ref::() { if partial_agg_exec.mode() == &AggregateMode::Partial && partial_agg_exec.group_expr().is_empty() @@ -260,11 +260,11 @@ mod tests { use crate::error::Result; use crate::logical_plan::Operator; + use crate::physical_plan::aggregates::AggregateExec; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::common; use crate::physical_plan::expressions::Count; use crate::physical_plan::filter::FilterExec; - use crate::physical_plan::hash_aggregate::HashAggregateExec; use crate::physical_plan::memory::MemoryExec; use crate::prelude::SessionContext; @@ -291,10 +291,7 @@ mod tests { } /// Checks that the count optimization was applied and we still get the right result - async fn assert_count_optim_success( - plan: HashAggregateExec, - nulls: bool, - ) -> Result<()> { + async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let conf = session_ctx.copied_config(); @@ -336,7 +333,7 @@ mod tests { let source = mock_data()?; let schema = source.schema(); - let partial_agg = HashAggregateExec::try_new( + let partial_agg = AggregateExec::try_new( AggregateMode::Partial, vec![], vec![count_expr(None, None)], @@ -344,7 +341,7 @@ mod tests { Arc::clone(&schema), )?; - let final_agg = HashAggregateExec::try_new( + let final_agg = AggregateExec::try_new( AggregateMode::Final, vec![], vec![count_expr(None, None)], @@ -363,7 +360,7 @@ mod tests { let source = mock_data()?; let schema = source.schema(); - let partial_agg = HashAggregateExec::try_new( + let partial_agg = AggregateExec::try_new( AggregateMode::Partial, vec![], vec![count_expr(Some(&schema), Some("a"))], @@ -371,7 +368,7 @@ mod tests { Arc::clone(&schema), )?; - let final_agg = HashAggregateExec::try_new( + let final_agg = AggregateExec::try_new( AggregateMode::Final, vec![], vec![count_expr(Some(&schema), Some("a"))], @@ -389,7 +386,7 @@ mod tests { let source = mock_data()?; let schema = source.schema(); - let partial_agg = HashAggregateExec::try_new( + let partial_agg = AggregateExec::try_new( AggregateMode::Partial, vec![], vec![count_expr(None, None)], @@ -400,7 +397,7 @@ mod tests { // We introduce an intermediate optimization step between the partial and final aggregtator let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); - let final_agg = HashAggregateExec::try_new( + let final_agg = AggregateExec::try_new( AggregateMode::Final, vec![], vec![count_expr(None, None)], @@ -418,7 +415,7 @@ mod tests { let source = mock_data()?; let schema = source.schema(); - let partial_agg = HashAggregateExec::try_new( + let partial_agg = AggregateExec::try_new( AggregateMode::Partial, vec![], vec![count_expr(Some(&schema), Some("a"))], @@ -429,7 +426,7 @@ mod tests { // We introduce an intermediate optimization step between the partial and final aggregtator let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); - let final_agg = HashAggregateExec::try_new( + let final_agg = AggregateExec::try_new( AggregateMode::Final, vec![], vec![count_expr(Some(&schema), Some("a"))], @@ -458,7 +455,7 @@ mod tests { source, )?); - let partial_agg = HashAggregateExec::try_new( + let partial_agg = AggregateExec::try_new( AggregateMode::Partial, vec![], vec![count_expr(None, None)], @@ -466,7 +463,7 @@ mod tests { Arc::clone(&schema), )?; - let final_agg = HashAggregateExec::try_new( + let final_agg = AggregateExec::try_new( AggregateMode::Final, vec![], vec![count_expr(None, None)], @@ -479,7 +476,7 @@ mod tests { AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; // check that the original ExecutionPlan was not replaced - assert!(optimized.as_any().is::()); + assert!(optimized.as_any().is::()); Ok(()) } @@ -500,7 +497,7 @@ mod tests { source, )?); - let partial_agg = HashAggregateExec::try_new( + let partial_agg = AggregateExec::try_new( AggregateMode::Partial, vec![], vec![count_expr(Some(&schema), Some("a"))], @@ -508,7 +505,7 @@ mod tests { Arc::clone(&schema), )?; - let final_agg = HashAggregateExec::try_new( + let final_agg = AggregateExec::try_new( AggregateMode::Final, vec![], vec![count_expr(Some(&schema), Some("a"))], @@ -521,7 +518,7 @@ mod tests { AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; // check that the original ExecutionPlan was not replaced - assert!(optimized.as_any().is::()); + assert!(optimized.as_any().is::()); Ok(()) } diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 50f8abe7f0a1..2b8582126285 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -62,9 +62,9 @@ impl PhysicalOptimizerRule for CoalesceBatches { || plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some(); - // TODO we should also do this for HashAggregateExec but we need to update tests + // TODO we should also do this for AggregateExec but we need to update tests // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068 - // || plan_any.downcast_ref::().is_some(); + // || plan_any.downcast_ref::().is_some(); if plan.children().is_empty() { // leaf node, children cannot be replaced diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 2506348fe7a0..3b2c4515eafd 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -241,10 +241,10 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; + use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; - use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::sorts::sort::SortExec; @@ -300,15 +300,15 @@ mod tests { Arc::new(ProjectionExec::try_new(exprs, input).unwrap()) } - fn hash_aggregate(input: Arc) -> Arc { + fn aggregate(input: Arc) -> Arc { let schema = schema(); Arc::new( - HashAggregateExec::try_new( + AggregateExec::try_new( AggregateMode::Final, vec![], vec![], Arc::new( - HashAggregateExec::try_new( + AggregateExec::try_new( AggregateMode::Partial, vec![], vec![], @@ -361,11 +361,11 @@ mod tests { #[test] fn added_repartition_to_single_partition() -> Result<()> { - let plan = hash_aggregate(parquet_exec()); + let plan = aggregate(parquet_exec()); let expected = [ - "HashAggregateExec: mode=Final, gby=[], aggr=[]", - "HashAggregateExec: mode=Partial, gby=[], aggr=[]", + "AggregateExec: mode=Final, gby=[], aggr=[]", + "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions=[x], projection=[c1]", ]; @@ -376,11 +376,11 @@ mod tests { #[test] fn repartition_deepest_node() -> Result<()> { - let plan = hash_aggregate(filter_exec(parquet_exec())); + let plan = aggregate(filter_exec(parquet_exec())); let expected = &[ - "HashAggregateExec: mode=Final, gby=[], aggr=[]", - "HashAggregateExec: mode=Partial, gby=[], aggr=[]", + "AggregateExec: mode=Final, gby=[], aggr=[]", + "AggregateExec: mode=Partial, gby=[], aggr=[]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions=[x], projection=[c1]", @@ -443,11 +443,11 @@ mod tests { #[test] fn repartition_ignores_limit() -> Result<()> { - let plan = hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))); + let plan = aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))); let expected = &[ - "HashAggregateExec: mode=Final, gby=[], aggr=[]", - "HashAggregateExec: mode=Partial, gby=[], aggr=[]", + "AggregateExec: mode=Final, gby=[], aggr=[]", + "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "GlobalLimitExec: limit=100", "LocalLimitExec: limit=100", diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs b/datafusion/core/src/physical_plan/aggregates/hash.rs new file mode 100644 index 000000000000..d5a3253678c6 --- /dev/null +++ b/datafusion/core/src/physical_plan/aggregates/hash.rs @@ -0,0 +1,477 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the execution plan for the hash aggregate operation + +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::vec; + +use ahash::RandomState; +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +use crate::error::Result; +use crate::physical_plan::aggregates::{AccumulatorItem, AggregateMode}; +use crate::physical_plan::hash_utils::create_hashes; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; +use crate::scalar::ScalarValue; + +use arrow::{array::ArrayRef, compute, compute::cast}; +use arrow::{ + array::{Array, UInt32Builder}, + error::{ArrowError, Result as ArrowResult}, +}; +use arrow::{ + datatypes::{Schema, SchemaRef}, + record_batch::RecordBatch, +}; +use hashbrown::raw::RawTable; + +/* +The architecture is the following: + +1. An accumulator has state that is updated on each batch. +2. At the end of the aggregation (e.g. end of batches in a partition), the accumulator converts its state to a RecordBatch of a single row +3. The RecordBatches of all accumulators are merged (`concatenate` in `rust/arrow`) together to a single RecordBatch. +4. The state's RecordBatch is `merge`d to a new state +5. The state is mapped to the final value + +Why: + +* Accumulators' state can be statically typed, but it is more efficient to transmit data from the accumulators via `Array` +* The `merge` operation must have access to the state of the aggregators because it uses it to correctly merge +* It uses Arrow's native dynamically typed object, `Array`. +* Arrow shines in batch operations and both `merge` and `concatenate` of uniform types are very performant. + +Example: average + +* the state is `n: u32` and `sum: f64` +* For every batch, we update them accordingly. +* At the end of the accumulation (of a partition), we convert `n` and `sum` to a RecordBatch of 1 row and two columns: `[n, sum]` +* The RecordBatch is (sent back / transmitted over network) +* Once all N record batches arrive, `merge` is performed, which builds a RecordBatch with N rows and 2 columns. +* Finally, `get_value` returns an array with one entry computed from the state +*/ +pub(crate) struct GroupedHashAggregateStream { + schema: SchemaRef, + input: SendableRecordBatchStream, + mode: AggregateMode, + accumulators: Accumulators, + aggregate_expressions: Vec>>, + + aggr_expr: Vec>, + group_expr: Vec>, + + baseline_metrics: BaselineMetrics, + random_state: RandomState, + finished: bool, +} + +impl GroupedHashAggregateStream { + /// Create a new GroupedHashAggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + group_expr: Vec>, + aggr_expr: Vec>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Result { + let timer = baseline_metrics.elapsed_compute().timer(); + + // The expressions to evaluate the batch, one vec of expressions per aggregation. + // Assume create_schema() always put group columns in front of aggr columns, we set + // col_idx_base to group expression count. + let aggregate_expressions = + aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; + + timer.done(); + + Ok(Self { + schema, + mode, + input, + aggr_expr, + group_expr, + baseline_metrics, + aggregate_expressions, + accumulators: Default::default(), + random_state: Default::default(), + finished: false, + }) + } +} + +impl Stream for GroupedHashAggregateStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = group_aggregate_batch( + &this.mode, + &this.random_state, + &this.group_expr, + &this.aggr_expr, + batch, + &mut this.accumulators, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = create_batch_from_map( + &this.mode, + &this.accumulators, + this.group_expr.len(), + &this.schema, + ) + .record_output(&this.baseline_metrics); + + timer.done(); + result + } + }; + + this.finished = true; + return Poll::Ready(Some(result)); + } + } +} + +impl RecordBatchStream for GroupedHashAggregateStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// TODO: Make this a member function of [`GroupedHashAggregateStream`] +fn group_aggregate_batch( + mode: &AggregateMode, + random_state: &RandomState, + group_expr: &[Arc], + aggr_expr: &[Arc], + batch: RecordBatch, + accumulators: &mut Accumulators, + aggregate_expressions: &[Vec>], +) -> Result<()> { + // evaluate the grouping expressions + let group_values = evaluate(group_expr, &batch)?; + + // evaluate the aggregation expressions. + // We could evaluate them after the `take`, but since we need to evaluate all + // of them anyways, it is more performant to do it while they are together. + let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?; + + // 1.1 construct the key from the group values + // 1.2 construct the mapping key if it does not exist + // 1.3 add the row' index to `indices` + + // track which entries in `accumulators` have rows in this batch to aggregate + let mut groups_with_rows = vec![]; + + // 1.1 Calculate the group keys for the group values + let mut batch_hashes = vec![0; batch.num_rows()]; + create_hashes(&group_values, random_state, &mut batch_hashes)?; + + for (row, hash) in batch_hashes.into_iter().enumerate() { + let Accumulators { map, group_states } = accumulators; + + let entry = map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let group_state = &group_states[*group_idx]; + group_values + .iter() + .zip(group_state.group_by_values.iter()) + .all(|(array, scalar)| scalar.eq_array(array, row)) + }); + + match entry { + // Existing entry for this group value + Some((_hash, group_idx)) => { + let group_state = &mut group_states[*group_idx]; + // 1.3 + if group_state.indices.is_empty() { + groups_with_rows.push(*group_idx); + }; + group_state.indices.push(row as u32); // remember this row + } + // 1.2 Need to create new entry + None => { + let accumulator_set = aggregates::create_accumulators(aggr_expr)?; + + // Copy group values out of arrays into `ScalarValue`s + let group_by_values = group_values + .iter() + .map(|col| ScalarValue::try_from_array(col, row)) + .collect::>>()?; + + // Add new entry to group_states and save newly created index + let group_state = GroupState { + group_by_values: group_by_values.into_boxed_slice(), + accumulator_set, + indices: vec![row as u32], // 1.3 + }; + let group_idx = group_states.len(); + group_states.push(group_state); + groups_with_rows.push(group_idx); + + // for hasher function, use precomputed hash value + map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash); + } + }; + } + + // Collect all indices + offsets based on keys in this vec + let mut batch_indices: UInt32Builder = UInt32Builder::new(0); + let mut offsets = vec![0]; + let mut offset_so_far = 0; + for group_idx in groups_with_rows.iter() { + let indices = &accumulators.group_states[*group_idx].indices; + batch_indices.append_slice(indices)?; + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.finish(); + + // `Take` all values based on indices into Arrays + let values: Vec>> = aggr_input_values + .iter() + .map(|array| { + array + .iter() + .map(|array| { + compute::take( + array.as_ref(), + &batch_indices, + None, // None: no index check + ) + .unwrap() + }) + .collect() + // 2.3 + }) + .collect(); + + // 2.1 for each key in this batch + // 2.2 for each aggregation + // 2.3 `slice` from each of its arrays the keys' values + // 2.4 update / merge the accumulator with the values + // 2.5 clear indices + groups_with_rows + .iter() + .zip(offsets.windows(2)) + .try_for_each(|(group_idx, offsets)| { + let group_state = &mut accumulators.group_states[*group_idx]; + // 2.2 + group_state + .accumulator_set + .iter_mut() + .zip(values.iter()) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + array.slice(offsets[0], offsets[1] - offsets[0]) + }) + .collect::>(), + ) + }) + .try_for_each(|(accumulator, values)| match mode { + AggregateMode::Partial => accumulator.update_batch(&values), + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch(&values) + } + }) + // 2.5 + .and({ + group_state.indices.clear(); + Ok(()) + }) + })?; + + Ok(()) +} + +/// The state that is built for each output group. +#[derive(Debug)] +struct GroupState { + /// The actual group by values, one for each group column + group_by_values: Box<[ScalarValue]>, + + // Accumulator state, one for each aggregate + accumulator_set: Vec, + + /// scratch space used to collect indices for input rows in a + /// bach that have values to aggregate. Reset on each batch + indices: Vec, +} + +/// The state of all the groups +#[derive(Default)] +struct Accumulators { + /// Logically maps group values to an index in `group_states` + /// + /// Uses the raw API of hashbrown to avoid actually storing the + /// keys in the table + /// + /// keys: u64 hashes of the GroupValue + /// values: (hash, index into `group_states`) + map: RawTable<(u64, usize)>, + + /// State for each group + group_states: Vec, +} + +impl std::fmt::Debug for Accumulators { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // hashes are not store inline, so could only get values + let map_string = "RawTable"; + f.debug_struct("Accumulators") + .field("map", &map_string) + .field("group_states", &self.group_states) + .finish() + } +} + +/// Evaluates expressions against a record batch. +fn evaluate( + expr: &[Arc], + batch: &RecordBatch, +) -> Result> { + expr.iter() + .map(|expr| expr.evaluate(batch)) + .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .collect::>>() +} + +/// Evaluates expressions against a record batch. +fn evaluate_many( + expr: &[Vec>], + batch: &RecordBatch, +) -> Result>> { + expr.iter() + .map(|expr| evaluate(expr, batch)) + .collect::>>() +} + +/// Create a RecordBatch with all group keys and accumulator' states or values. +fn create_batch_from_map( + mode: &AggregateMode, + accumulators: &Accumulators, + num_group_expr: usize, + output_schema: &Schema, +) -> ArrowResult { + if accumulators.group_states.is_empty() { + return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned()))); + } + let accs = &accumulators.group_states[0].accumulator_set; + let mut acc_data_types: Vec = vec![]; + + // Calculate number/shape of state arrays + match mode { + AggregateMode::Partial => { + for acc in accs.iter() { + let state = acc.state()?; + acc_data_types.push(state.len()); + } + } + AggregateMode::Final | AggregateMode::FinalPartitioned => { + acc_data_types = vec![1; accs.len()]; + } + } + + let mut columns = (0..num_group_expr) + .map(|i| { + ScalarValue::iter_to_array( + accumulators + .group_states + .iter() + .map(|group_state| group_state.group_by_values[i].clone()), + ) + }) + .collect::>>()?; + + // add state / evaluated arrays + for (x, &state_len) in acc_data_types.iter().enumerate() { + for y in 0..state_len { + match mode { + AggregateMode::Partial => { + let res = ScalarValue::iter_to_array( + accumulators.group_states.iter().map(|group_state| { + let x = group_state.accumulator_set[x].state().unwrap(); + x[y].clone() + }), + )?; + + columns.push(res); + } + AggregateMode::Final | AggregateMode::FinalPartitioned => { + let res = ScalarValue::iter_to_array( + accumulators.group_states.iter().map(|group_state| { + group_state.accumulator_set[x].evaluate().unwrap() + }), + )?; + columns.push(res); + } + } + } + } + + // cast output if needed (e.g. for types like Dictionary where + // the intermediate GroupByScalar type was not the same as the + // output + let columns = columns + .iter() + .zip(output_schema.fields().iter()) + .map(|(col, desired_field)| cast(col, desired_field.data_type())) + .collect::>>()?; + + RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns) +} diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index c0208b23974b..5e1da793c3ea 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -17,5 +17,724 @@ //! Aggregates functionalities +use crate::execution::context::TaskContext; +use crate::physical_plan::aggregates::hash::GroupedHashAggregateStream; +use crate::physical_plan::aggregates::no_grouping::AggregateStream; +use crate::physical_plan::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; +use crate::physical_plan::{ + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + SendableRecordBatchStream, Statistics, +}; +use arrow::array::ArrayRef; +use arrow::datatypes::{Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion_common::Result; +use datafusion_expr::Accumulator; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{ + expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr, +}; +use std::any::Any; +use std::sync::Arc; + +mod hash; +mod no_grouping; + pub use datafusion_expr::AggregateFunction; pub use datafusion_physical_expr::expressions::create_aggregate_expr; + +/// Hash aggregate modes +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum AggregateMode { + /// Partial aggregate that can be applied in parallel across input partitions + Partial, + /// Final aggregate that produces a single partition of output + Final, + /// Final aggregate that works on pre-partitioned data. + /// + /// This requires the invariant that all rows with a particular + /// grouping key are in the same partitions, such as is the case + /// with Hash repartitioning on the group keys. If a group key is + /// duplicated, duplicate groups would be produced + FinalPartitioned, +} + +/// Hash aggregate execution plan +#[derive(Debug)] +pub struct AggregateExec { + /// Aggregation mode (full, partial) + mode: AggregateMode, + /// Grouping expressions + group_expr: Vec<(Arc, String)>, + /// Aggregate expressions + aggr_expr: Vec>, + /// Input plan, could be a partial aggregate or the input to the aggregate + input: Arc, + /// Schema after the aggregate is applied + schema: SchemaRef, + /// Input schema before any aggregation is applied. For partial aggregate this will be the + /// same as input.schema() but for the final aggregate it will be the same as the input + /// to the partial aggregate + input_schema: SchemaRef, + /// Execution Metrics + metrics: ExecutionPlanMetricsSet, +} + +impl AggregateExec { + /// Create a new hash aggregate execution plan + pub fn try_new( + mode: AggregateMode, + group_expr: Vec<(Arc, String)>, + aggr_expr: Vec>, + input: Arc, + input_schema: SchemaRef, + ) -> Result { + let schema = create_schema(&input.schema(), &group_expr, &aggr_expr, mode)?; + + let schema = Arc::new(schema); + + Ok(AggregateExec { + mode, + group_expr, + aggr_expr, + input, + schema, + input_schema, + metrics: ExecutionPlanMetricsSet::new(), + }) + } + + /// Aggregation mode (full, partial) + pub fn mode(&self) -> &AggregateMode { + &self.mode + } + + /// Grouping expressions + pub fn group_expr(&self) -> &[(Arc, String)] { + &self.group_expr + } + + /// Grouping expressions as they occur in the output schema + pub fn output_group_expr(&self) -> Vec> { + // Update column indices. Since the group by columns come first in the output schema, their + // indices are simply 0..self.group_expr(len). + self.group_expr + .iter() + .enumerate() + .map(|(index, (_col, name))| { + Arc::new(expressions::Column::new(name, index)) as Arc + }) + .collect() + } + + /// Aggregate expressions + pub fn aggr_expr(&self) -> &[Arc] { + &self.aggr_expr + } + + /// Input plan + pub fn input(&self) -> &Arc { + &self.input + } + + /// Get the input schema before any aggregates are applied + pub fn input_schema(&self) -> SchemaRef { + self.input_schema.clone() + } +} + +#[async_trait] +impl ExecutionPlan for AggregateExec { + /// Return a reference to Any that can be used for down-casting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn required_child_distribution(&self) -> Distribution { + match &self.mode { + AggregateMode::Partial => Distribution::UnspecifiedDistribution, + AggregateMode::FinalPartitioned => Distribution::HashPartitioned( + self.group_expr.iter().map(|x| x.0.clone()).collect(), + ), + AggregateMode::Final => Distribution::SinglePartition, + } + } + + fn relies_on_input_order(&self) -> bool { + false + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(AggregateExec::try_new( + self.mode, + self.group_expr.clone(), + self.aggr_expr.clone(), + children[0].clone(), + self.input_schema.clone(), + )?)) + } + + async fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let input = self.input.execute(partition, context).await?; + let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect(); + + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + + if self.group_expr.is_empty() { + Ok(Box::pin(AggregateStream::new( + self.mode, + self.schema.clone(), + self.aggr_expr.clone(), + input, + baseline_metrics, + )?)) + } else { + Ok(Box::pin(GroupedHashAggregateStream::new( + self.mode, + self.schema.clone(), + group_expr, + self.aggr_expr.clone(), + input, + baseline_metrics, + )?)) + } + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "AggregateExec: mode={:?}", self.mode)?; + let g: Vec = self + .group_expr + .iter() + .map(|(e, alias)| { + let e = e.to_string(); + if &e != alias { + format!("{} as {}", e, alias) + } else { + e + } + }) + .collect(); + write!(f, ", gby=[{}]", g.join(", "))?; + + let a: Vec = self + .aggr_expr + .iter() + .map(|agg| agg.name().to_string()) + .collect(); + write!(f, ", aggr=[{}]", a.join(", "))?; + } + } + Ok(()) + } + + fn statistics(&self) -> Statistics { + // TODO stats: group expressions: + // - once expressions will be able to compute their own stats, use it here + // - case where we group by on a column for which with have the `distinct` stat + // TODO stats: aggr expression: + // - aggregations somtimes also preserve invariants such as min, max... + match self.mode { + AggregateMode::Final | AggregateMode::FinalPartitioned + if self.group_expr.is_empty() => + { + Statistics { + num_rows: Some(1), + is_exact: true, + ..Default::default() + } + } + _ => Statistics::default(), + } + } +} + +fn create_schema( + input_schema: &Schema, + group_expr: &[(Arc, String)], + aggr_expr: &[Arc], + mode: AggregateMode, +) -> datafusion_common::Result { + let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len()); + for (expr, name) in group_expr { + fields.push(Field::new( + name, + expr.data_type(input_schema)?, + expr.nullable(input_schema)?, + )) + } + + match mode { + AggregateMode::Partial => { + // in partial mode, the fields of the accumulator's state + for expr in aggr_expr { + fields.extend(expr.state_fields()?.iter().cloned()) + } + } + AggregateMode::Final | AggregateMode::FinalPartitioned => { + // in final mode, the field with the final result of the accumulator + for expr in aggr_expr { + fields.push(expr.field()?) + } + } + } + + Ok(Schema::new(fields)) +} + +/// returns physical expressions to evaluate against a batch +/// The expressions are different depending on `mode`: +/// * Partial: AggregateExpr::expressions +/// * Final: columns of `AggregateExpr::state_fields()` +fn aggregate_expressions( + aggr_expr: &[Arc], + mode: &AggregateMode, + col_idx_base: usize, +) -> datafusion_common::Result>>> { + match mode { + AggregateMode::Partial => { + Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect()) + } + // in this mode, we build the merge expressions of the aggregation + AggregateMode::Final | AggregateMode::FinalPartitioned => { + let mut col_idx_base = col_idx_base; + Ok(aggr_expr + .iter() + .map(|agg| { + let exprs = merge_expressions(col_idx_base, agg)?; + col_idx_base += exprs.len(); + Ok(exprs) + }) + .collect::>>()?) + } + } +} + +/// uses `state_fields` to build a vec of physical column expressions required to merge the +/// AggregateExpr' accumulator's state. +/// +/// `index_base` is the starting physical column index for the next expanded state field. +fn merge_expressions( + index_base: usize, + expr: &Arc, +) -> Result>> { + Ok(expr + .state_fields()? + .iter() + .enumerate() + .map(|(idx, f)| { + Arc::new(Column::new(f.name(), index_base + idx)) as Arc + }) + .collect::>()) +} + +pub(crate) type AccumulatorItem = Box; + +fn create_accumulators( + aggr_expr: &[Arc], +) -> datafusion_common::Result> { + aggr_expr + .iter() + .map(|expr| expr.create_accumulator()) + .collect::>>() +} + +/// returns a vector of ArrayRefs, where each entry corresponds to either the +/// final value (mode = Final) or states (mode = Partial) +fn finalize_aggregation( + accumulators: &[AccumulatorItem], + mode: &AggregateMode, +) -> datafusion_common::Result> { + match mode { + AggregateMode::Partial => { + // build the vector of states + let a = accumulators + .iter() + .map(|accumulator| accumulator.state()) + .map(|value| { + value.map(|e| { + e.iter().map(|v| v.to_array()).collect::>() + }) + }) + .collect::>>()?; + Ok(a.iter().flatten().cloned().collect::>()) + } + AggregateMode::Final | AggregateMode::FinalPartitioned => { + // merge the state to the final value + accumulators + .iter() + .map(|accumulator| accumulator.evaluate().map(|v| v.to_array())) + .collect::>>() + } + } +} + +#[cfg(test)] +mod tests { + use crate::execution::context::TaskContext; + use crate::from_slice::FromSlice; + use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; + use crate::physical_plan::expressions::{col, Avg}; + use crate::test::assert_is_pending; + use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; + use crate::{assert_batches_sorted_eq, physical_plan::common}; + use arrow::array::{Float64Array, UInt32Array}; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use arrow::error::Result as ArrowResult; + use arrow::record_batch::RecordBatch; + use async_trait::async_trait; + use datafusion_common::{DataFusionError, Result}; + use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, PhysicalSortExpr}; + use futures::{FutureExt, Stream}; + use std::any::Any; + use std::sync::Arc; + use std::task::{Context, Poll}; + + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use crate::physical_plan::{ + ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + Statistics, + }; + use crate::prelude::SessionContext; + + /// some mock data to aggregates + fn some_data() -> (Arc, Vec) { + // define a schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::Float64, false), + ])); + + // define data. + ( + schema.clone(), + vec![ + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from_slice(&[2, 3, 4, 4])), + Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 4.0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + schema, + vec![ + Arc::new(UInt32Array::from_slice(&[2, 3, 3, 4])), + Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 4.0])), + ], + ) + .unwrap(), + ], + ) + } + + /// build the aggregates on the data from some_data() and check the results + async fn check_aggregates(input: Arc) -> Result<()> { + let input_schema = input.schema(); + + let groups: Vec<(Arc, String)> = + vec![(col("a", &input_schema)?, "a".to_string())]; + + let aggregates: Vec> = vec![Arc::new(Avg::new( + col("b", &input_schema)?, + "AVG(b)".to_string(), + DataType::Float64, + ))]; + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let partial_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + groups.clone(), + aggregates.clone(), + input, + input_schema.clone(), + )?); + + let result = + common::collect(partial_aggregate.execute(0, task_ctx.clone()).await?) + .await?; + + let expected = vec![ + "+---+---------------+-------------+", + "| a | AVG(b)[count] | AVG(b)[sum] |", + "+---+---------------+-------------+", + "| 2 | 2 | 2 |", + "| 3 | 3 | 7 |", + "| 4 | 3 | 11 |", + "+---+---------------+-------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); + + let final_group: Vec> = (0..groups.len()) + .map(|i| col(&groups[i].1, &input_schema)) + .collect::>()?; + + let merged_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Final, + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) + .collect(), + aggregates, + merge, + input_schema, + )?); + + let result = + common::collect(merged_aggregate.execute(0, task_ctx.clone()).await?).await?; + assert_eq!(result.len(), 1); + + let batch = &result[0]; + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + let expected = vec![ + "+---+--------------------+", + "| a | AVG(b) |", + "+---+--------------------+", + "| 2 | 1 |", + "| 3 | 2.3333333333333335 |", // 3, (2 + 3 + 2) / 3 + "| 4 | 3.6666666666666665 |", // 4, (3 + 4 + 4) / 3 + "+---+--------------------+", + ]; + + assert_batches_sorted_eq!(&expected, &result); + + let metrics = merged_aggregate.metrics().unwrap(); + let output_rows = metrics.output_rows().unwrap(); + assert_eq!(3, output_rows); + + Ok(()) + } + + /// Define a test source that can yield back to runtime before returning its first item /// + + #[derive(Debug)] + struct TestYieldingExec { + /// True if this exec should yield back to runtime the first time it is polled + pub yield_first: bool, + } + + #[async_trait] + impl ExecutionPlan for TestYieldingExec { + fn as_any(&self) -> &dyn Any { + self + } + fn schema(&self) -> SchemaRef { + some_data().0 + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Err(DataFusionError::Internal(format!( + "Children cannot be replaced in {:?}", + self + ))) + } + + async fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + let stream = if self.yield_first { + TestYieldingStream::New + } else { + TestYieldingStream::Yielded + }; + + Ok(Box::pin(stream)) + } + + fn statistics(&self) -> Statistics { + let (_, batches) = some_data(); + common::compute_record_batch_statistics(&[batches], &self.schema(), None) + } + } + + /// A stream using the demo data. If inited as new, it will first yield to runtime before returning records + enum TestYieldingStream { + New, + Yielded, + ReturnedBatch1, + ReturnedBatch2, + } + + impl Stream for TestYieldingStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match &*self { + TestYieldingStream::New => { + *(self.as_mut()) = TestYieldingStream::Yielded; + cx.waker().wake_by_ref(); + Poll::Pending + } + TestYieldingStream::Yielded => { + *(self.as_mut()) = TestYieldingStream::ReturnedBatch1; + Poll::Ready(Some(Ok(some_data().1[0].clone()))) + } + TestYieldingStream::ReturnedBatch1 => { + *(self.as_mut()) = TestYieldingStream::ReturnedBatch2; + Poll::Ready(Some(Ok(some_data().1[1].clone()))) + } + TestYieldingStream::ReturnedBatch2 => Poll::Ready(None), + } + } + } + + impl RecordBatchStream for TestYieldingStream { + fn schema(&self) -> SchemaRef { + some_data().0 + } + } + + //// Tests //// + + #[tokio::test] + async fn aggregate_source_not_yielding() -> Result<()> { + let input: Arc = + Arc::new(TestYieldingExec { yield_first: false }); + + check_aggregates(input).await + } + + #[tokio::test] + async fn aggregate_source_with_yielding() -> Result<()> { + let input: Arc = + Arc::new(TestYieldingExec { yield_first: true }); + + check_aggregates(input).await + } + + #[tokio::test] + async fn test_drop_cancel_without_groups() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); + + let groups = vec![]; + + let aggregates: Vec> = vec![Arc::new(Avg::new( + col("a", &schema)?, + "AVG(a)".to_string(), + DataType::Float64, + ))]; + + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); + let refs = blocking_exec.refs(); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + groups.clone(), + aggregates.clone(), + blocking_exec, + schema, + )?); + + let fut = crate::physical_plan::collect(aggregate_exec, task_ctx); + let mut fut = fut.boxed(); + + assert_is_pending(&mut fut); + drop(fut); + assert_strong_count_converges_to_zero(refs).await; + + Ok(()) + } + + #[tokio::test] + async fn test_drop_cancel_with_groups() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Float32, true), + Field::new("b", DataType::Float32, true), + ])); + + let groups: Vec<(Arc, String)> = + vec![(col("a", &schema)?, "a".to_string())]; + + let aggregates: Vec> = vec![Arc::new(Avg::new( + col("b", &schema)?, + "AVG(b)".to_string(), + DataType::Float64, + ))]; + + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); + let refs = blocking_exec.refs(); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + groups.clone(), + aggregates.clone(), + blocking_exec, + schema, + )?); + + let fut = crate::physical_plan::collect(aggregate_exec, task_ctx); + let mut fut = fut.boxed(); + + assert_is_pending(&mut fut); + drop(fut); + assert_strong_count_converges_to_zero(refs).await; + + Ok(()) + } +} diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs new file mode 100644 index 000000000000..f687c982c220 --- /dev/null +++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Aggregate without grouping columns + +use crate::physical_plan::aggregates::{ + aggregate_expressions, create_accumulators, finalize_aggregation, AccumulatorItem, + AggregateMode, +}; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; +use arrow::datatypes::SchemaRef; +use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +/// stream struct for aggregation without grouping columns +pub(crate) struct AggregateStream { + schema: SchemaRef, + mode: AggregateMode, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + aggregate_expressions: Vec>>, + accumulators: Vec, + finished: bool, +} + +impl AggregateStream { + /// Create a new AggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + aggr_expr: Vec>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> datafusion_common::Result { + let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?; + let accumulators = create_accumulators(&aggr_expr)?; + + Ok(Self { + schema, + mode, + input, + baseline_metrics, + aggregate_expressions, + accumulators, + finished: false, + }) + } +} + +impl Stream for AggregateStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = aggregate_batch( + &this.mode, + &batch, + &mut this.accumulators, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = finalize_aggregation(&this.accumulators, &this.mode) + .map_err(|e| ArrowError::ExternalError(Box::new(e))) + .and_then(|columns| { + RecordBatch::try_new(this.schema.clone(), columns) + }) + .record_output(&this.baseline_metrics); + + timer.done(); + result + } + }; + + this.finished = true; + return Poll::Ready(Some(result)); + } + } +} + +impl RecordBatchStream for AggregateStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// TODO: Make this a member function +fn aggregate_batch( + mode: &AggregateMode, + batch: &RecordBatch, + accumulators: &mut [AccumulatorItem], + expressions: &[Vec>], +) -> Result<()> { + // 1.1 iterate accumulators and respective expressions together + // 1.2 evaluate expressions + // 1.3 update / merge accumulators with the expressions' values + + // 1.1 + accumulators + .iter_mut() + .zip(expressions) + .try_for_each(|(accum, expr)| { + // 1.2 + let values = &expr + .iter() + .map(|e| e.evaluate(batch)) + .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .collect::>>()?; + + // 1.3 + match mode { + AggregateMode::Partial => accum.update_batch(values), + AggregateMode::Final | AggregateMode::FinalPartitioned => { + accum.merge_batch(values) + } + } + }) +} diff --git a/datafusion/core/src/physical_plan/hash_aggregate.rs b/datafusion/core/src/physical_plan/hash_aggregate.rs deleted file mode 100644 index 643174557997..000000000000 --- a/datafusion/core/src/physical_plan/hash_aggregate.rs +++ /dev/null @@ -1,1299 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines the execution plan for the hash aggregate operation - -use std::any::Any; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::vec; - -use ahash::RandomState; -use futures::{ - ready, - stream::{Stream, StreamExt}, -}; - -use crate::error::Result; -use crate::physical_plan::hash_utils::create_hashes; -use crate::physical_plan::{ - Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan, - Partitioning, PhysicalExpr, -}; -use crate::scalar::ScalarValue; - -use arrow::{array::ArrayRef, compute, compute::cast}; -use arrow::{ - array::{Array, UInt32Builder}, - error::{ArrowError, Result as ArrowResult}, -}; -use arrow::{ - datatypes::{Field, Schema, SchemaRef}, - record_batch::RecordBatch, -}; -use hashbrown::raw::RawTable; - -use crate::execution::context::TaskContext; -use async_trait::async_trait; - -use super::expressions::PhysicalSortExpr; -use super::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, -}; -use super::Statistics; -use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream}; - -/// Hash aggregate modes -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum AggregateMode { - /// Partial aggregate that can be applied in parallel across input partitions - Partial, - /// Final aggregate that produces a single partition of output - Final, - /// Final aggregate that works on pre-partitioned data. - /// - /// This requires the invariant that all rows with a particular - /// grouping key are in the same partitions, such as is the case - /// with Hash repartitioning on the group keys. If a group key is - /// duplicated, duplicate groups would be produced - FinalPartitioned, -} - -/// Hash aggregate execution plan -#[derive(Debug)] -pub struct HashAggregateExec { - /// Aggregation mode (full, partial) - mode: AggregateMode, - /// Grouping expressions - group_expr: Vec<(Arc, String)>, - /// Aggregate expressions - aggr_expr: Vec>, - /// Input plan, could be a partial aggregate or the input to the aggregate - input: Arc, - /// Schema after the aggregate is applied - schema: SchemaRef, - /// Input schema before any aggregation is applied. For partial aggregate this will be the - /// same as input.schema() but for the final aggregate it will be the same as the input - /// to the partial aggregate - input_schema: SchemaRef, - /// Execution Metrics - metrics: ExecutionPlanMetricsSet, -} - -fn create_schema( - input_schema: &Schema, - group_expr: &[(Arc, String)], - aggr_expr: &[Arc], - mode: AggregateMode, -) -> Result { - let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len()); - for (expr, name) in group_expr { - fields.push(Field::new( - name, - expr.data_type(input_schema)?, - expr.nullable(input_schema)?, - )) - } - - match mode { - AggregateMode::Partial => { - // in partial mode, the fields of the accumulator's state - for expr in aggr_expr { - fields.extend(expr.state_fields()?.iter().cloned()) - } - } - AggregateMode::Final | AggregateMode::FinalPartitioned => { - // in final mode, the field with the final result of the accumulator - for expr in aggr_expr { - fields.push(expr.field()?) - } - } - } - - Ok(Schema::new(fields)) -} - -impl HashAggregateExec { - /// Create a new hash aggregate execution plan - pub fn try_new( - mode: AggregateMode, - group_expr: Vec<(Arc, String)>, - aggr_expr: Vec>, - input: Arc, - input_schema: SchemaRef, - ) -> Result { - let schema = create_schema(&input.schema(), &group_expr, &aggr_expr, mode)?; - - let schema = Arc::new(schema); - - Ok(HashAggregateExec { - mode, - group_expr, - aggr_expr, - input, - schema, - input_schema, - metrics: ExecutionPlanMetricsSet::new(), - }) - } - - /// Aggregation mode (full, partial) - pub fn mode(&self) -> &AggregateMode { - &self.mode - } - - /// Grouping expressions - pub fn group_expr(&self) -> &[(Arc, String)] { - &self.group_expr - } - - /// Grouping expressions as they occur in the output schema - pub fn output_group_expr(&self) -> Vec> { - // Update column indices. Since the group by columns come first in the output schema, their - // indices are simply 0..self.group_expr(len). - self.group_expr - .iter() - .enumerate() - .map(|(index, (_col, name))| { - Arc::new(Column::new(name, index)) as Arc - }) - .collect() - } - - /// Aggregate expressions - pub fn aggr_expr(&self) -> &[Arc] { - &self.aggr_expr - } - - /// Input plan - pub fn input(&self) -> &Arc { - &self.input - } - - /// Get the input schema before any aggregates are applied - pub fn input_schema(&self) -> SchemaRef { - self.input_schema.clone() - } -} - -#[async_trait] -impl ExecutionPlan for HashAggregateExec { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - - fn required_child_distribution(&self) -> Distribution { - match &self.mode { - AggregateMode::Partial => Distribution::UnspecifiedDistribution, - AggregateMode::FinalPartitioned => Distribution::HashPartitioned( - self.group_expr.iter().map(|x| x.0.clone()).collect(), - ), - AggregateMode::Final => Distribution::SinglePartition, - } - } - - /// Get the output partitioning of this plan - fn output_partitioning(&self) -> Partitioning { - self.input.output_partitioning() - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn relies_on_input_order(&self) -> bool { - false - } - - async fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - let input = self.input.execute(partition, context).await?; - let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect(); - - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - - if self.group_expr.is_empty() { - Ok(Box::pin(HashAggregateStream::new( - self.mode, - self.schema.clone(), - self.aggr_expr.clone(), - input, - baseline_metrics, - )?)) - } else { - Ok(Box::pin(GroupedHashAggregateStream::new( - self.mode, - self.schema.clone(), - group_expr, - self.aggr_expr.clone(), - input, - baseline_metrics, - )?)) - } - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - Ok(Arc::new(HashAggregateExec::try_new( - self.mode, - self.group_expr.clone(), - self.aggr_expr.clone(), - children[0].clone(), - self.input_schema.clone(), - )?)) - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - write!(f, "HashAggregateExec: mode={:?}", self.mode)?; - let g: Vec = self - .group_expr - .iter() - .map(|(e, alias)| { - let e = e.to_string(); - if &e != alias { - format!("{} as {}", e, alias) - } else { - e - } - }) - .collect(); - write!(f, ", gby=[{}]", g.join(", "))?; - - let a: Vec = self - .aggr_expr - .iter() - .map(|agg| agg.name().to_string()) - .collect(); - write!(f, ", aggr=[{}]", a.join(", "))?; - } - } - Ok(()) - } - - fn statistics(&self) -> Statistics { - // TODO stats: group expressions: - // - once expressions will be able to compute their own stats, use it here - // - case where we group by on a column for which with have the `distinct` stat - // TODO stats: aggr expression: - // - aggregations somtimes also preserve invariants such as min, max... - match self.mode { - AggregateMode::Final | AggregateMode::FinalPartitioned - if self.group_expr.is_empty() => - { - Statistics { - num_rows: Some(1), - is_exact: true, - ..Default::default() - } - } - _ => Statistics::default(), - } - } -} - -/* -The architecture is the following: - -1. An accumulator has state that is updated on each batch. -2. At the end of the aggregation (e.g. end of batches in a partition), the accumulator converts its state to a RecordBatch of a single row -3. The RecordBatches of all accumulators are merged (`concatenate` in `rust/arrow`) together to a single RecordBatch. -4. The state's RecordBatch is `merge`d to a new state -5. The state is mapped to the final value - -Why: - -* Accumulators' state can be statically typed, but it is more efficient to transmit data from the accumulators via `Array` -* The `merge` operation must have access to the state of the aggregators because it uses it to correctly merge -* It uses Arrow's native dynamically typed object, `Array`. -* Arrow shines in batch operations and both `merge` and `concatenate` of uniform types are very performant. - -Example: average - -* the state is `n: u32` and `sum: f64` -* For every batch, we update them accordingly. -* At the end of the accumulation (of a partition), we convert `n` and `sum` to a RecordBatch of 1 row and two columns: `[n, sum]` -* The RecordBatch is (sent back / transmitted over network) -* Once all N record batches arrive, `merge` is performed, which builds a RecordBatch with N rows and 2 columns. -* Finally, `get_value` returns an array with one entry computed from the state -*/ -struct GroupedHashAggregateStream { - schema: SchemaRef, - input: SendableRecordBatchStream, - mode: AggregateMode, - accumulators: Accumulators, - aggregate_expressions: Vec>>, - - aggr_expr: Vec>, - group_expr: Vec>, - - baseline_metrics: BaselineMetrics, - random_state: RandomState, - finished: bool, -} - -impl GroupedHashAggregateStream { - /// Create a new HashAggregateStream - pub fn new( - mode: AggregateMode, - schema: SchemaRef, - group_expr: Vec>, - aggr_expr: Vec>, - input: SendableRecordBatchStream, - baseline_metrics: BaselineMetrics, - ) -> Result { - let timer = baseline_metrics.elapsed_compute().timer(); - - // The expressions to evaluate the batch, one vec of expressions per aggregation. - // Assume create_schema() always put group columns in front of aggr columns, we set - // col_idx_base to group expression count. - let aggregate_expressions = - aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; - - timer.done(); - - Ok(Self { - schema, - mode, - input, - aggr_expr, - group_expr, - baseline_metrics, - aggregate_expressions, - accumulators: Default::default(), - random_state: Default::default(), - finished: false, - }) - } -} - -impl Stream for GroupedHashAggregateStream { - type Item = ArrowResult; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let this = &mut *self; - if this.finished { - return Poll::Ready(None); - } - - let elapsed_compute = this.baseline_metrics.elapsed_compute(); - - loop { - let result = match ready!(this.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let timer = elapsed_compute.timer(); - let result = group_aggregate_batch( - &this.mode, - &this.random_state, - &this.group_expr, - &this.aggr_expr, - batch, - &mut this.accumulators, - &this.aggregate_expressions, - ); - - timer.done(); - - match result { - Ok(_) => continue, - Err(e) => Err(ArrowError::ExternalError(Box::new(e))), - } - } - Some(Err(e)) => Err(e), - None => { - this.finished = true; - let timer = this.baseline_metrics.elapsed_compute().timer(); - let result = create_batch_from_map( - &this.mode, - &this.accumulators, - this.group_expr.len(), - &this.schema, - ) - .record_output(&this.baseline_metrics); - - timer.done(); - result - } - }; - - this.finished = true; - return Poll::Ready(Some(result)); - } - } -} - -impl RecordBatchStream for GroupedHashAggregateStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -/// TODO: Make this a member function of [`GroupedHashAggregateStream`] -fn group_aggregate_batch( - mode: &AggregateMode, - random_state: &RandomState, - group_expr: &[Arc], - aggr_expr: &[Arc], - batch: RecordBatch, - accumulators: &mut Accumulators, - aggregate_expressions: &[Vec>], -) -> Result<()> { - // evaluate the grouping expressions - let group_values = evaluate(group_expr, &batch)?; - - // evaluate the aggregation expressions. - // We could evaluate them after the `take`, but since we need to evaluate all - // of them anyways, it is more performant to do it while they are together. - let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?; - - // 1.1 construct the key from the group values - // 1.2 construct the mapping key if it does not exist - // 1.3 add the row' index to `indices` - - // track which entries in `accumulators` have rows in this batch to aggregate - let mut groups_with_rows = vec![]; - - // 1.1 Calculate the group keys for the group values - let mut batch_hashes = vec![0; batch.num_rows()]; - create_hashes(&group_values, random_state, &mut batch_hashes)?; - - for (row, hash) in batch_hashes.into_iter().enumerate() { - let Accumulators { map, group_states } = accumulators; - - let entry = map.get_mut(hash, |(_hash, group_idx)| { - // verify that a group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - let group_state = &group_states[*group_idx]; - group_values - .iter() - .zip(group_state.group_by_values.iter()) - .all(|(array, scalar)| scalar.eq_array(array, row)) - }); - - match entry { - // Existing entry for this group value - Some((_hash, group_idx)) => { - let group_state = &mut group_states[*group_idx]; - // 1.3 - if group_state.indices.is_empty() { - groups_with_rows.push(*group_idx); - }; - group_state.indices.push(row as u32); // remember this row - } - // 1.2 Need to create new entry - None => { - let accumulator_set = create_accumulators(aggr_expr)?; - - // Copy group values out of arrays into `ScalarValue`s - let group_by_values = group_values - .iter() - .map(|col| ScalarValue::try_from_array(col, row)) - .collect::>>()?; - - // Add new entry to group_states and save newly created index - let group_state = GroupState { - group_by_values: group_by_values.into_boxed_slice(), - accumulator_set, - indices: vec![row as u32], // 1.3 - }; - let group_idx = group_states.len(); - group_states.push(group_state); - groups_with_rows.push(group_idx); - - // for hasher function, use precomputed hash value - map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash); - } - }; - } - - // Collect all indices + offsets based on keys in this vec - let mut batch_indices: UInt32Builder = UInt32Builder::new(0); - let mut offsets = vec![0]; - let mut offset_so_far = 0; - for group_idx in groups_with_rows.iter() { - let indices = &accumulators.group_states[*group_idx].indices; - batch_indices.append_slice(indices)?; - offset_so_far += indices.len(); - offsets.push(offset_so_far); - } - let batch_indices = batch_indices.finish(); - - // `Take` all values based on indices into Arrays - let values: Vec>> = aggr_input_values - .iter() - .map(|array| { - array - .iter() - .map(|array| { - compute::take( - array.as_ref(), - &batch_indices, - None, // None: no index check - ) - .unwrap() - }) - .collect() - // 2.3 - }) - .collect(); - - // 2.1 for each key in this batch - // 2.2 for each aggregation - // 2.3 `slice` from each of its arrays the keys' values - // 2.4 update / merge the accumulator with the values - // 2.5 clear indices - groups_with_rows - .iter() - .zip(offsets.windows(2)) - .try_for_each(|(group_idx, offsets)| { - let group_state = &mut accumulators.group_states[*group_idx]; - // 2.2 - group_state - .accumulator_set - .iter_mut() - .zip(values.iter()) - .map(|(accumulator, aggr_array)| { - ( - accumulator, - aggr_array - .iter() - .map(|array| { - // 2.3 - array.slice(offsets[0], offsets[1] - offsets[0]) - }) - .collect::>(), - ) - }) - .try_for_each(|(accumulator, values)| match mode { - AggregateMode::Partial => accumulator.update_batch(&values), - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values) - } - }) - // 2.5 - .and({ - group_state.indices.clear(); - Ok(()) - }) - })?; - - Ok(()) -} - -type AccumulatorItem = Box; - -/// The state that is built for each output group. -#[derive(Debug)] -struct GroupState { - /// The actual group by values, one for each group column - group_by_values: Box<[ScalarValue]>, - - // Accumulator state, one for each aggregate - accumulator_set: Vec, - - /// scratch space used to collect indices for input rows in a - /// bach that have values to aggregate. Reset on each batch - indices: Vec, -} - -/// The state of all the groups -#[derive(Default)] -struct Accumulators { - /// Logically maps group values to an index in `group_states` - /// - /// Uses the raw API of hashbrown to avoid actually storing the - /// keys in the table - /// - /// keys: u64 hashes of the GroupValue - /// values: (hash, index into `group_states`) - map: RawTable<(u64, usize)>, - - /// State for each group - group_states: Vec, -} - -impl std::fmt::Debug for Accumulators { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - // hashes are not store inline, so could only get values - let map_string = "RawTable"; - f.debug_struct("Accumulators") - .field("map", &map_string) - .field("group_states", &self.group_states) - .finish() - } -} - -/// Evaluates expressions against a record batch. -fn evaluate( - expr: &[Arc], - batch: &RecordBatch, -) -> Result> { - expr.iter() - .map(|expr| expr.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect::>>() -} - -/// Evaluates expressions against a record batch. -fn evaluate_many( - expr: &[Vec>], - batch: &RecordBatch, -) -> Result>> { - expr.iter() - .map(|expr| evaluate(expr, batch)) - .collect::>>() -} - -/// uses `state_fields` to build a vec of physical column expressions required to merge the -/// AggregateExpr' accumulator's state. -/// -/// `index_base` is the starting physical column index for the next expanded state field. -fn merge_expressions( - index_base: usize, - expr: &Arc, -) -> Result>> { - Ok(expr - .state_fields()? - .iter() - .enumerate() - .map(|(idx, f)| { - Arc::new(Column::new(f.name(), index_base + idx)) as Arc - }) - .collect::>()) -} - -/// returns physical expressions to evaluate against a batch -/// The expressions are different depending on `mode`: -/// * Partial: AggregateExpr::expressions -/// * Final: columns of `AggregateExpr::state_fields()` -fn aggregate_expressions( - aggr_expr: &[Arc], - mode: &AggregateMode, - col_idx_base: usize, -) -> Result>>> { - match mode { - AggregateMode::Partial => { - Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect()) - } - // in this mode, we build the merge expressions of the aggregation - AggregateMode::Final | AggregateMode::FinalPartitioned => { - let mut col_idx_base = col_idx_base; - Ok(aggr_expr - .iter() - .map(|agg| { - let exprs = merge_expressions(col_idx_base, agg)?; - col_idx_base += exprs.len(); - Ok(exprs) - }) - .collect::>>()?) - } - } -} - -/// stream struct for hash aggregation -pub struct HashAggregateStream { - schema: SchemaRef, - mode: AggregateMode, - input: SendableRecordBatchStream, - baseline_metrics: BaselineMetrics, - aggregate_expressions: Vec>>, - accumulators: Vec, - finished: bool, -} - -impl HashAggregateStream { - /// Create a new HashAggregateStream - pub fn new( - mode: AggregateMode, - schema: SchemaRef, - aggr_expr: Vec>, - input: SendableRecordBatchStream, - baseline_metrics: BaselineMetrics, - ) -> Result { - let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?; - let accumulators = create_accumulators(&aggr_expr)?; - - Ok(Self { - schema, - mode, - input, - baseline_metrics, - aggregate_expressions, - accumulators, - finished: false, - }) - } -} - -/// TODO: Make this a member function -fn aggregate_batch( - mode: &AggregateMode, - batch: &RecordBatch, - accumulators: &mut [AccumulatorItem], - expressions: &[Vec>], -) -> Result<()> { - // 1.1 iterate accumulators and respective expressions together - // 1.2 evaluate expressions - // 1.3 update / merge accumulators with the expressions' values - - // 1.1 - accumulators - .iter_mut() - .zip(expressions) - .try_for_each(|(accum, expr)| { - // 1.2 - let values = &expr - .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect::>>()?; - - // 1.3 - match mode { - AggregateMode::Partial => accum.update_batch(values), - AggregateMode::Final | AggregateMode::FinalPartitioned => { - accum.merge_batch(values) - } - } - }) -} - -impl Stream for HashAggregateStream { - type Item = ArrowResult; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let this = &mut *self; - if this.finished { - return Poll::Ready(None); - } - - let elapsed_compute = this.baseline_metrics.elapsed_compute(); - - loop { - let result = match ready!(this.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let timer = elapsed_compute.timer(); - let result = aggregate_batch( - &this.mode, - &batch, - &mut this.accumulators, - &this.aggregate_expressions, - ); - - timer.done(); - - match result { - Ok(_) => continue, - Err(e) => Err(ArrowError::ExternalError(Box::new(e))), - } - } - Some(Err(e)) => Err(e), - None => { - this.finished = true; - let timer = this.baseline_metrics.elapsed_compute().timer(); - let result = finalize_aggregation(&this.accumulators, &this.mode) - .map_err(|e| ArrowError::ExternalError(Box::new(e))) - .and_then(|columns| { - RecordBatch::try_new(this.schema.clone(), columns) - }) - .record_output(&this.baseline_metrics); - - timer.done(); - result - } - }; - - this.finished = true; - return Poll::Ready(Some(result)); - } - } -} - -impl RecordBatchStream for HashAggregateStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -/// Create a RecordBatch with all group keys and accumulator' states or values. -fn create_batch_from_map( - mode: &AggregateMode, - accumulators: &Accumulators, - num_group_expr: usize, - output_schema: &Schema, -) -> ArrowResult { - if accumulators.group_states.is_empty() { - return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned()))); - } - let accs = &accumulators.group_states[0].accumulator_set; - let mut acc_data_types: Vec = vec![]; - - // Calculate number/shape of state arrays - match mode { - AggregateMode::Partial => { - for acc in accs.iter() { - let state = acc.state()?; - acc_data_types.push(state.len()); - } - } - AggregateMode::Final | AggregateMode::FinalPartitioned => { - acc_data_types = vec![1; accs.len()]; - } - } - - let mut columns = (0..num_group_expr) - .map(|i| { - ScalarValue::iter_to_array( - accumulators - .group_states - .iter() - .map(|group_state| group_state.group_by_values[i].clone()), - ) - }) - .collect::>>()?; - - // add state / evaluated arrays - for (x, &state_len) in acc_data_types.iter().enumerate() { - for y in 0..state_len { - match mode { - AggregateMode::Partial => { - let res = ScalarValue::iter_to_array( - accumulators.group_states.iter().map(|group_state| { - let x = group_state.accumulator_set[x].state().unwrap(); - x[y].clone() - }), - )?; - - columns.push(res); - } - AggregateMode::Final | AggregateMode::FinalPartitioned => { - let res = ScalarValue::iter_to_array( - accumulators.group_states.iter().map(|group_state| { - group_state.accumulator_set[x].evaluate().unwrap() - }), - )?; - columns.push(res); - } - } - } - } - - // cast output if needed (e.g. for types like Dictionary where - // the intermediate GroupByScalar type was not the same as the - // output - let columns = columns - .iter() - .zip(output_schema.fields().iter()) - .map(|(col, desired_field)| cast(col, desired_field.data_type())) - .collect::>>()?; - - RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns) -} - -fn create_accumulators( - aggr_expr: &[Arc], -) -> Result> { - aggr_expr - .iter() - .map(|expr| expr.create_accumulator()) - .collect::>>() -} - -/// returns a vector of ArrayRefs, where each entry corresponds to either the -/// final value (mode = Final) or states (mode = Partial) -fn finalize_aggregation( - accumulators: &[AccumulatorItem], - mode: &AggregateMode, -) -> Result> { - match mode { - AggregateMode::Partial => { - // build the vector of states - let a = accumulators - .iter() - .map(|accumulator| accumulator.state()) - .map(|value| { - value.map(|e| { - e.iter().map(|v| v.to_array()).collect::>() - }) - }) - .collect::>>()?; - Ok(a.iter().flatten().cloned().collect::>()) - } - AggregateMode::Final | AggregateMode::FinalPartitioned => { - // merge the state to the final value - accumulators - .iter() - .map(|accumulator| accumulator.evaluate().map(|v| v.to_array())) - .collect::>>() - } - } -} - -#[cfg(test)] -mod tests { - - use super::*; - use crate::from_slice::FromSlice; - use crate::physical_plan::expressions::{col, Avg}; - use crate::test::assert_is_pending; - use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; - use crate::{assert_batches_sorted_eq, physical_plan::common}; - use arrow::array::{Float64Array, UInt32Array}; - use arrow::datatypes::DataType; - use datafusion_common::DataFusionError; - use futures::FutureExt; - - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use crate::prelude::SessionContext; - - /// some mock data to aggregates - fn some_data() -> (Arc, Vec) { - // define a schema. - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::UInt32, false), - Field::new("b", DataType::Float64, false), - ])); - - // define data. - ( - schema.clone(), - vec![ - RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(UInt32Array::from_slice(&[2, 3, 4, 4])), - Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 4.0])), - ], - ) - .unwrap(), - RecordBatch::try_new( - schema, - vec![ - Arc::new(UInt32Array::from_slice(&[2, 3, 3, 4])), - Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 4.0])), - ], - ) - .unwrap(), - ], - ) - } - - /// build the aggregates on the data from some_data() and check the results - async fn check_aggregates(input: Arc) -> Result<()> { - let input_schema = input.schema(); - - let groups: Vec<(Arc, String)> = - vec![(col("a", &input_schema)?, "a".to_string())]; - - let aggregates: Vec> = vec![Arc::new(Avg::new( - col("b", &input_schema)?, - "AVG(b)".to_string(), - DataType::Float64, - ))]; - - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - - let partial_aggregate = Arc::new(HashAggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates.clone(), - input, - input_schema.clone(), - )?); - - let result = - common::collect(partial_aggregate.execute(0, task_ctx.clone()).await?) - .await?; - - let expected = vec![ - "+---+---------------+-------------+", - "| a | AVG(b)[count] | AVG(b)[sum] |", - "+---+---------------+-------------+", - "| 2 | 2 | 2 |", - "| 3 | 3 | 7 |", - "| 4 | 3 | 11 |", - "+---+---------------+-------------+", - ]; - assert_batches_sorted_eq!(expected, &result); - - let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); - - let final_group: Vec> = (0..groups.len()) - .map(|i| col(&groups[i].1, &input_schema)) - .collect::>()?; - - let merged_aggregate = Arc::new(HashAggregateExec::try_new( - AggregateMode::Final, - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) - .collect(), - aggregates, - merge, - input_schema, - )?); - - let result = - common::collect(merged_aggregate.execute(0, task_ctx.clone()).await?).await?; - assert_eq!(result.len(), 1); - - let batch = &result[0]; - assert_eq!(batch.num_columns(), 2); - assert_eq!(batch.num_rows(), 3); - - let expected = vec![ - "+---+--------------------+", - "| a | AVG(b) |", - "+---+--------------------+", - "| 2 | 1 |", - "| 3 | 2.3333333333333335 |", // 3, (2 + 3 + 2) / 3 - "| 4 | 3.6666666666666665 |", // 4, (3 + 4 + 4) / 3 - "+---+--------------------+", - ]; - - assert_batches_sorted_eq!(&expected, &result); - - let metrics = merged_aggregate.metrics().unwrap(); - let output_rows = metrics.output_rows().unwrap(); - assert_eq!(3, output_rows); - - Ok(()) - } - - /// Define a test source that can yield back to runtime before returning its first item /// - - #[derive(Debug)] - struct TestYieldingExec { - /// True if this exec should yield back to runtime the first time it is polled - pub yield_first: bool, - } - - #[async_trait] - impl ExecutionPlan for TestYieldingExec { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { - some_data().0 - } - - fn children(&self) -> Vec> { - vec![] - } - - fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(1) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Err(DataFusionError::Internal(format!( - "Children cannot be replaced in {:?}", - self - ))) - } - - async fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - let stream = if self.yield_first { - TestYieldingStream::New - } else { - TestYieldingStream::Yielded - }; - - Ok(Box::pin(stream)) - } - - fn statistics(&self) -> Statistics { - let (_, batches) = some_data(); - common::compute_record_batch_statistics(&[batches], &self.schema(), None) - } - } - - /// A stream using the demo data. If inited as new, it will first yield to runtime before returning records - enum TestYieldingStream { - New, - Yielded, - ReturnedBatch1, - ReturnedBatch2, - } - - impl Stream for TestYieldingStream { - type Item = ArrowResult; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match &*self { - TestYieldingStream::New => { - *(self.as_mut()) = TestYieldingStream::Yielded; - cx.waker().wake_by_ref(); - Poll::Pending - } - TestYieldingStream::Yielded => { - *(self.as_mut()) = TestYieldingStream::ReturnedBatch1; - Poll::Ready(Some(Ok(some_data().1[0].clone()))) - } - TestYieldingStream::ReturnedBatch1 => { - *(self.as_mut()) = TestYieldingStream::ReturnedBatch2; - Poll::Ready(Some(Ok(some_data().1[1].clone()))) - } - TestYieldingStream::ReturnedBatch2 => Poll::Ready(None), - } - } - } - - impl RecordBatchStream for TestYieldingStream { - fn schema(&self) -> SchemaRef { - some_data().0 - } - } - - //// Tests //// - - #[tokio::test] - async fn aggregate_source_not_yielding() -> Result<()> { - let input: Arc = - Arc::new(TestYieldingExec { yield_first: false }); - - check_aggregates(input).await - } - - #[tokio::test] - async fn aggregate_source_with_yielding() -> Result<()> { - let input: Arc = - Arc::new(TestYieldingExec { yield_first: true }); - - check_aggregates(input).await - } - - #[tokio::test] - async fn test_drop_cancel_without_groups() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); - - let groups = vec![]; - - let aggregates: Vec> = vec![Arc::new(Avg::new( - col("a", &schema)?, - "AVG(a)".to_string(), - DataType::Float64, - ))]; - - let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); - let refs = blocking_exec.refs(); - let hash_aggregate_exec = Arc::new(HashAggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates.clone(), - blocking_exec, - schema, - )?); - - let fut = crate::physical_plan::collect(hash_aggregate_exec, task_ctx); - let mut fut = fut.boxed(); - - assert_is_pending(&mut fut); - drop(fut); - assert_strong_count_converges_to_zero(refs).await; - - Ok(()) - } - - #[tokio::test] - async fn test_drop_cancel_with_groups() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Float32, true), - Field::new("b", DataType::Float32, true), - ])); - - let groups: Vec<(Arc, String)> = - vec![(col("a", &schema)?, "a".to_string())]; - - let aggregates: Vec> = vec![Arc::new(Avg::new( - col("b", &schema)?, - "AVG(b)".to_string(), - DataType::Float64, - ))]; - - let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); - let refs = blocking_exec.refs(); - let hash_aggregate_exec = Arc::new(HashAggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates.clone(), - blocking_exec, - schema, - )?); - - let fut = crate::physical_plan::collect(hash_aggregate_exec, task_ctx); - let mut fut = fut.boxed(); - - assert_is_pending(&mut fut); - drop(fut); - assert_strong_count_converges_to_zero(refs).await; - - Ok(()) - } -} diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index b7b25a636efc..dc963c7e1bdc 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -555,7 +555,6 @@ pub use datafusion_physical_expr::expressions; pub mod file_format; pub mod filter; pub mod functions; -pub mod hash_aggregate; pub mod hash_join; pub mod hash_utils; pub mod join_utils; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 84785777b016..2c84d3baac44 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -34,6 +34,7 @@ use crate::logical_plan::{ }; use crate::logical_plan::{Limit, Values}; use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; +use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::cross_join::CrossJoinExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::expressions; @@ -41,7 +42,6 @@ use crate::physical_plan::expressions::{ CaseExpr, Column, GetIndexedFieldExpr, Literal, PhysicalSortExpr, }; use crate::physical_plan::filter::FilterExec; -use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::hash_join::HashJoinExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; @@ -524,7 +524,7 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; - let initial_aggr = Arc::new(HashAggregateExec::try_new( + let initial_aggr = Arc::new(AggregateExec::try_new( AggregateMode::Partial, groups.clone(), aggregates.clone(), @@ -566,7 +566,7 @@ impl DefaultPhysicalPlanner { (initial_aggr, AggregateMode::Final) }; - Ok(Arc::new(HashAggregateExec::try_new( + Ok(Arc::new(AggregateExec::try_new( next_partition_mode, final_group .iter() @@ -1839,7 +1839,7 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let final_hash_agg = execution_plan .as_any() - .downcast_ref::() + .downcast_ref::() .expect("hash aggregate"); assert_eq!( "SUM(aggregate_test_100.c2)", @@ -1873,7 +1873,7 @@ mod tests { let formatted = format!("{:?}", execution_plan); // Make sure the plan contains a FinalPartitioned, which means it will not use the Final - // mode in HashAggregate (which is slower) + // mode in Aggregate (which is slower) assert!(formatted.contains("FinalPartitioned")); Ok(()) diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs index b5ea5477ead2..f5c25dbadce0 100644 --- a/datafusion/core/tests/sql/avro.rs +++ b/datafusion/core/tests/sql/avro.rs @@ -150,9 +150,9 @@ async fn avro_explain() { vec![ "physical_plan", "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\ - \n HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\ + \n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\ \n CoalescePartitionsExec\ - \n HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ + \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ \n AvroExec: files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], limit=None\ \n", diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index a124311aa4ff..f72e0f8f8120 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -51,12 +51,12 @@ async fn explain_analyze_baseline_metrics() { assert_metrics!( &formatted, - "HashAggregateExec: mode=Partial, gby=[]", + "AggregateExec: mode=Partial, gby=[]", "metrics=[output_rows=3, elapsed_compute=" ); assert_metrics!( &formatted, - "HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", + "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( @@ -110,7 +110,7 @@ async fn explain_analyze_baseline_metrics() { use datafusion::physical_plan::sorts; plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() // CoalescePartitionsExec doesn't do any work so is not included || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() @@ -660,10 +660,10 @@ async fn test_physical_plan_display_indent() { " SortExec: [the_min@2 DESC]", " CoalescePartitionsExec", " ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]", - " HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", + " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 3)", - " HashAggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", + " AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", " CoalesceBatchesExec: target_batch_size=4096", " FilterExec: c12@1 < CAST(10 AS Float64)", " RepartitionExec: partitioning=RoundRobinBatch(3)", diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs index e2209b8764fb..79deaae79ab3 100644 --- a/datafusion/core/tests/sql/json.rs +++ b/datafusion/core/tests/sql/json.rs @@ -92,9 +92,9 @@ async fn json_explain() { vec![ "physical_plan", "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\ - \n HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\ + \n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\ \n CoalescePartitionsExec\ - \n HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ + \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ \n JsonExec: limit=None, files=[tests/jsons/2.json]\n", ],