From e6628149056c7543bff8faba711b4010f5789a72 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 25 Sep 2024 15:36:48 -0400 Subject: [PATCH 01/23] Add test that invokes bloom_filter_agg. --- .../apache/comet/exec/CometExecSuite.scala | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 05aa23723..78f59cbea 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -31,10 +31,10 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Column, CometTestBase, DataFrame, DataFrameWriter, Row, SaveMode} -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable} -import org.apache.spark.sql.catalyst.expressions.Hex -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Hex} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode, BloomFilterAggregate} import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometSparkToColumnarExec, CometTakeOrderedAndProjectExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec} @@ -911,6 +911,29 @@ class CometExecSuite extends CometTestBase { } } + test("bloom_filter_agg") { + val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg") + spark.sessionState.functionRegistry.registerFunction( + funcId_bloom_filter_agg, + new ExpressionInfo(classOf[BloomFilterAggregate].getName, "bloom_filter_agg"), + (children: Seq[Expression]) => + children.size match { + case 1 => new BloomFilterAggregate(children.head) + case 2 => new BloomFilterAggregate(children.head, children(1)) + case 3 => new BloomFilterAggregate(children.head, children(1), children(2)) + }) + + withParquetTable( + (0 until 100) + .map(_ => (Random.nextInt(), Random.nextInt() % 5)), + "tbl") { + val df = sql("SELECT bloom_filter_agg(cast(_2 as long)) FROM tbl") + checkSparkAnswerAndOperator(df) + } + + spark.sessionState.functionRegistry.dropFunction(funcId_bloom_filter_agg) + } + test("sort (non-global)") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) From 20f6e671274604186512af3e7e0620b39d3b7603 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 25 Sep 2024 16:58:22 -0400 Subject: [PATCH 02/23] QueryPlanSerde support for BloomFilterAgg. --- .../core/src/execution/datafusion/planner.rs | 3 ++ native/proto/src/proto/expr.proto | 10 ++++++ .../apache/comet/serde/QueryPlanSerde.scala | 35 ++++++++++++++++++- 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 9000db61e..01decfaf6 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1607,6 +1607,9 @@ impl PhysicalPlanner { )); Self::create_aggr_func_expr("correlation", schema, vec![child1, child2], func) } + AggExprStruct::BloomFilterAgg(expr) => Err(ExecutionError::GeneralError( + "BloomFilterAgg not implemented yet".to_string(), + )), } } diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 88940f386..1ae421c3e 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -100,6 +100,7 @@ message AggExpr { Variance variance = 13; Stddev stddev = 14; Correlation correlation = 15; + BloomFilterAgg bloomFilterAgg = 16; } } @@ -191,6 +192,15 @@ message Correlation { DataType datatype = 4; } +message BloomFilterAgg { + Expr child = 1; + Expr numItems = 2; + Expr numBits = 3; + int32 mutableBufferOffset = 4; + int32 inputBufferOffset = 5; + DataType datatype = 6; +} + message Literal { oneof value { bool bool_val = 1; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 51b32b7df..5e37461fc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, Complete, Corr, Count, CovPopulation, CovSample, Final, First, Last, Max, Min, Partial, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, Complete, Corr, Count, CovPopulation, CovSample, Final, First, Last, Max, Min, Partial, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, NormalizeNaNAndZero} import org.apache.spark.sql.catalyst.plans._ @@ -760,6 +760,39 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(aggExpr, child1, child2) None } + + case bloom_filter @ BloomFilterAggregate( + child, + numItems, + numBits, + mutableBufferOffset, + inputBufferOffset) => { + val childExpr = exprToProto(child, inputs, binding) + val numItemsExpr = exprToProto(numItems, inputs, binding) + val numBitsExpr = exprToProto(numBits, inputs, binding) + val dataType = serializeDataType(bloom_filter.dataType) + + if (childExpr.isDefined && numItemsExpr.isDefined && + numBitsExpr.isDefined && dataType.isDefined) { + val bloomFilterAggBuilder = ExprOuterClass.BloomFilterAgg.newBuilder() + bloomFilterAggBuilder.setChild(childExpr.get) + bloomFilterAggBuilder.setNumItems(childExpr.get) + bloomFilterAggBuilder.setNumBits(childExpr.get) + bloomFilterAggBuilder.setMutableBufferOffset(mutableBufferOffset) + bloomFilterAggBuilder.setInputBufferOffset(inputBufferOffset) + bloomFilterAggBuilder.setDatatype(dataType.get) + + Some( + ExprOuterClass.AggExpr + .newBuilder() + .setBloomFilterAgg(bloomFilterAggBuilder) + .build()) + } else { + withInfo(aggExpr, child, numItems, numBits) + None + } + } + case fn => val msg = s"unsupported Spark aggregate function: ${fn.prettyName}" emitWarning(msg) From 1ec31a2ce3ee965a135541b3b793f937044d8a8f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 27 Sep 2024 09:45:29 -0400 Subject: [PATCH 03/23] Add bloom_filter_agg based on sample UDAF. planner instantiates it now. Added spark_bit_array_tests. --- .../expressions/bloom_filter_agg.rs | 190 ++++++++++++++++++ .../execution/datafusion/expressions/mod.rs | 1 + .../core/src/execution/datafusion/planner.rs | 22 +- .../datafusion/util/spark_bit_array.rs | 26 +++ 4 files changed, 236 insertions(+), 3 deletions(-) create mode 100644 native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs new file mode 100644 index 000000000..9a610fb28 --- /dev/null +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::{Field, Schema}; +use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; +use datafusion_physical_expr::NullState; +use std::{any::Any, sync::Arc}; + +use arrow::{ + array::{ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array}, + datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type}, + record_batch::RecordBatch, +}; +use datafusion::error::Result; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::prelude::*; +use datafusion_common::{cast::as_float64_array, ScalarValue}; +use datafusion_expr::{ + function::{AccumulatorArgs, StateFieldsArgs}, + Accumulator, AggregateUDF, AggregateUDFImpl, Signature, +}; + +/// This example shows how to use the full AggregateUDFImpl API to implement a user +/// defined aggregate function. As in the `simple_udaf.rs` example, this struct implements +/// a function `accumulator` that returns the `Accumulator` instance. +/// +/// To do so, we must implement the `AggregateUDFImpl` trait. +#[derive(Debug, Clone)] +pub struct BloomFilterAgg { + signature: Signature, +} + +impl BloomFilterAgg { + pub fn new( + expr: Arc, + num_items: Arc, + num_bits: Arc, + name: impl Into, + data_type: DataType, + mutable_buffer_offset: i32, + input_buffer_offset: i32, + ) -> Self { + assert!(matches!(data_type, DataType::Binary)); + Self { + signature: Signature::exact( + vec![DataType::Int64], + Volatility::Immutable, + ), + } + } +} + +impl AggregateUDFImpl for BloomFilterAgg { + /// We implement as_any so that we can downcast the AggregateUDFImpl trait object + fn as_any(&self) -> &dyn Any { + self + } + + /// Return the name of this function + fn name(&self) -> &str { + "geo_mean" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Binary) + } + + /// This is the accumulator factory; DataFusion uses it to create new accumulators. + /// + /// This is the accumulator factory for row wise accumulation; Even when `GroupsAccumulator` + /// is supported, DataFusion will use this row oriented + /// accumulator when the aggregate function is used as a window function + /// or when there are only aggregates (no GROUP BY columns) in the plan. + fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { + Ok(Box::new(GeometricMean::new())) + } + + /// This is the description of the state. accumulator's state() must match the types here. + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + Ok(vec![ + Field::new("prod", args.return_type.clone(), true), + Field::new("n", DataType::UInt32, true), + ]) + } + + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { + false + } +} + +/// A UDAF has state across multiple rows, and thus we require a `struct` with that state. +#[derive(Debug)] +struct GeometricMean { + n: u32, + prod: f64, +} + +impl GeometricMean { + // how the struct is initialized + pub fn new() -> Self { + GeometricMean { n: 0, prod: 1.0 } + } +} + +// UDAFs are built using the trait `Accumulator`, that offers DataFusion the necessary functions +// to use them. +impl Accumulator for GeometricMean { + // This function serializes our state to `ScalarValue`, which DataFusion uses + // to pass this state between execution stages. + // Note that this can be arbitrary data. + fn state(&mut self) -> Result> { + Ok(vec![ + ScalarValue::from(self.prod), + ScalarValue::from(self.n), + ]) + } + + // DataFusion expects this function to return the final value of this aggregator. + // in this case, this is the formula of the geometric mean + fn evaluate(&mut self) -> Result { + let value = self.prod.powf(1.0 / self.n as f64); + Ok(ScalarValue::from(value)) + } + + // DataFusion calls this function to update the accumulator's state for a batch + // of inputs rows. In this case the product is updated with values from the first column + // and the count is updated based on the row count + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + let arr = &values[0]; + (0..arr.len()).try_for_each(|index| { + let v = ScalarValue::try_from_array(arr, index)?; + + if let ScalarValue::Float64(Some(value)) = v { + self.prod *= value; + self.n += 1; + } else { + unreachable!("") + } + Ok(()) + }) + } + + // Merge the output of `Self::state()` from other instances of this accumulator + // into this accumulator's state + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if states.is_empty() { + return Ok(()); + } + let arr = &states[0]; + (0..arr.len()).try_for_each(|index| { + let v = states + .iter() + .map(|array| ScalarValue::try_from_array(array, index)) + .collect::>>()?; + if let (ScalarValue::Float64(Some(prod)), ScalarValue::UInt32(Some(n))) = (&v[0], &v[1]) + { + self.prod *= prod; + self.n += n; + } else { + unreachable!("") + } + Ok(()) + }) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } +} diff --git a/native/core/src/execution/datafusion/expressions/mod.rs b/native/core/src/execution/datafusion/expressions/mod.rs index 10c9d3092..48b80384b 100644 --- a/native/core/src/execution/datafusion/expressions/mod.rs +++ b/native/core/src/execution/datafusion/expressions/mod.rs @@ -25,6 +25,7 @@ pub use normalize_nan::NormalizeNaNAndZero; use crate::errors::CometError; pub mod avg; pub mod avg_decimal; +pub mod bloom_filter_agg; pub mod bloom_filter_might_contain; pub mod comet_scalar_funcs; pub mod correlation; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 01decfaf6..8f907d000 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -28,6 +28,7 @@ use crate::{ avg::Avg, avg_decimal::AvgDecimal, bitwise_not::BitwiseNotExpr, + bloom_filter_agg::BloomFilterAgg, bloom_filter_might_contain::BloomFilterMightContain, checkoverflow::CheckOverflow, correlation::Correlation, @@ -1607,9 +1608,24 @@ impl PhysicalPlanner { )); Self::create_aggr_func_expr("correlation", schema, vec![child1, child2], func) } - AggExprStruct::BloomFilterAgg(expr) => Err(ExecutionError::GeneralError( - "BloomFilterAgg not implemented yet".to_string(), - )), + AggExprStruct::BloomFilterAgg(expr) => { + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; + let num_items = + self.create_expr(expr.num_items.as_ref().unwrap(), Arc::clone(&schema))?; + let num_bits = + self.create_expr(expr.num_bits.as_ref().unwrap(), Arc::clone(&schema))?; + let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); + let func = AggregateUDF::new_from_impl(BloomFilterAgg::new( + Arc::clone(&child), + Arc::clone(&num_items), + Arc::clone(&num_bits), + "bloom_filter_agg", + datatype, + expr.mutable_buffer_offset, + expr.input_buffer_offset, + )); + Self::create_aggr_func_expr("bloom_filter_agg", schema, vec![child], func) + } } } diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 9729627df..19f00b200 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -128,4 +128,30 @@ mod test { // check cardinality assert_eq!(array.cardinality(), 6); } + + #[test] + fn test_spark_bit_with_empty_buffer() { + let buf = vec![0u64; 4]; + let array = SparkBitArray::new(buf); + + assert_eq!(array.bit_size(), 256); + assert_eq!(array.cardinality(), 0); + + for n in 0..256 { + assert!(!array.get(n)); + } + } + + #[test] + fn test_spark_bit_with_full_buffer() { + let buf = vec![u64::MAX; 4]; + let array = SparkBitArray::new(buf); + + assert_eq!(array.bit_size(), 256); + assert_eq!(array.cardinality(), 256); + + for n in 0..256 { + assert!(array.get(n)); + } + } } From 3965dc4dbc3d0d0aec3a1643a7ece0f4fc384214 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 27 Sep 2024 17:49:29 -0400 Subject: [PATCH 04/23] Partial work on Accumulator. Need to finish merge_batch and state. --- .../expressions/bloom_filter_agg.rs | 151 +++++++++--------- .../expressions/bloom_filter_might_contain.rs | 2 +- .../core/src/execution/datafusion/planner.rs | 2 - .../datafusion/util/spark_bit_array.rs | 4 + .../datafusion/util/spark_bloom_filter.rs | 31 +++- .../apache/comet/serde/QueryPlanSerde.scala | 4 +- 6 files changed, 114 insertions(+), 80 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 9a610fb28..2609246c3 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -20,6 +20,8 @@ use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; use datafusion_physical_expr::NullState; use std::{any::Any, sync::Arc}; +use crate::execution::datafusion::util::spark_bloom_filter; +use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter; use arrow::{ array::{ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array}, datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type}, @@ -33,15 +35,15 @@ use datafusion_expr::{ function::{AccumulatorArgs, StateFieldsArgs}, Accumulator, AggregateUDF, AggregateUDFImpl, Signature, }; +use datafusion_physical_expr::expressions::Literal; -/// This example shows how to use the full AggregateUDFImpl API to implement a user -/// defined aggregate function. As in the `simple_udaf.rs` example, this struct implements -/// a function `accumulator` that returns the `Accumulator` instance. -/// -/// To do so, we must implement the `AggregateUDFImpl` trait. #[derive(Debug, Clone)] pub struct BloomFilterAgg { + name: String, signature: Signature, + expr: Arc, + num_items: i32, + num_bits: i32, } impl BloomFilterAgg { @@ -51,15 +53,31 @@ impl BloomFilterAgg { num_bits: Arc, name: impl Into, data_type: DataType, - mutable_buffer_offset: i32, - input_buffer_offset: i32, ) -> Self { assert!(matches!(data_type, DataType::Binary)); + let num_items = match num_items + .as_any() + .downcast_ref::() + .unwrap() + .value() + { + ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32, + _ => { + unreachable!() + } + }; + let num_bits = match num_bits.as_any().downcast_ref::().unwrap().value() { + ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32, + _ => { + unreachable!() + } + }; Self { - signature: Signature::exact( - vec![DataType::Int64], - Volatility::Immutable, - ), + name: name.into(), + signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable), + expr, + num_items: num_items, + num_bits: num_bits, } } } @@ -72,7 +90,7 @@ impl AggregateUDFImpl for BloomFilterAgg { /// Return the name of this function fn name(&self) -> &str { - "geo_mean" + "bloom_filter_agg" } fn signature(&self) -> &Signature { @@ -90,15 +108,15 @@ impl AggregateUDFImpl for BloomFilterAgg { /// accumulator when the aggregate function is used as a window function /// or when there are only aggregates (no GROUP BY columns) in the plan. fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { - Ok(Box::new(GeometricMean::new())) + Ok(Box::new(SparkBloomFilter::from(( + spark_bloom_filter::optimal_num_hash_functions(self.num_items, self.num_bits), + self.num_bits, + )))) } /// This is the description of the state. accumulator's state() must match the types here. fn state_fields(&self, args: StateFieldsArgs) -> Result> { - Ok(vec![ - Field::new("prod", args.return_type.clone(), true), - Field::new("n", DataType::UInt32, true), - ]) + Ok(vec![Field::new("bits", DataType::Binary, false)]) } fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { @@ -106,40 +124,9 @@ impl AggregateUDFImpl for BloomFilterAgg { } } -/// A UDAF has state across multiple rows, and thus we require a `struct` with that state. -#[derive(Debug)] -struct GeometricMean { - n: u32, - prod: f64, -} - -impl GeometricMean { - // how the struct is initialized - pub fn new() -> Self { - GeometricMean { n: 0, prod: 1.0 } - } -} - // UDAFs are built using the trait `Accumulator`, that offers DataFusion the necessary functions // to use them. -impl Accumulator for GeometricMean { - // This function serializes our state to `ScalarValue`, which DataFusion uses - // to pass this state between execution stages. - // Note that this can be arbitrary data. - fn state(&mut self) -> Result> { - Ok(vec![ - ScalarValue::from(self.prod), - ScalarValue::from(self.n), - ]) - } - - // DataFusion expects this function to return the final value of this aggregator. - // in this case, this is the formula of the geometric mean - fn evaluate(&mut self) -> Result { - let value = self.prod.powf(1.0 / self.n as f64); - Ok(ScalarValue::from(value)) - } - +impl Accumulator for SparkBloomFilter { // DataFusion calls this function to update the accumulator's state for a batch // of inputs rows. In this case the product is updated with values from the first column // and the count is updated based on the row count @@ -151,9 +138,8 @@ impl Accumulator for GeometricMean { (0..arr.len()).try_for_each(|index| { let v = ScalarValue::try_from_array(arr, index)?; - if let ScalarValue::Float64(Some(value)) = v { - self.prod *= value; - self.n += 1; + if let ScalarValue::Int64(Some(value)) = v { + self.put_long(value); } else { unreachable!("") } @@ -161,30 +147,49 @@ impl Accumulator for GeometricMean { }) } - // Merge the output of `Self::state()` from other instances of this accumulator - // into this accumulator's state - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - if states.is_empty() { - return Ok(()); - } - let arr = &states[0]; - (0..arr.len()).try_for_each(|index| { - let v = states - .iter() - .map(|array| ScalarValue::try_from_array(array, index)) - .collect::>>()?; - if let (ScalarValue::Float64(Some(prod)), ScalarValue::UInt32(Some(n))) = (&v[0], &v[1]) - { - self.prod *= prod; - self.n += n; - } else { - unreachable!("") - } - Ok(()) - }) + // DataFusion expects this function to return the final value of this aggregator. + // in this case, this is the formula of the geometric mean + fn evaluate(&mut self) -> Result { + // TODO(Matt): Finish this. + // let value = self.prod.powf(1.0 / self.n as f64); + // Ok(ScalarValue::from(value)) + Ok(ScalarValue::from(0)) } fn size(&self) -> usize { std::mem::size_of_val(self) } + + // This function serializes our state to `ScalarValue`, which DataFusion uses + // to pass this state between execution stages. + // Note that this can be arbitrary data. + fn state(&mut self) -> Result> { + // TODO(Matt): Finish this. + Ok(vec![ScalarValue::from(0), ScalarValue::from(0)]) + } + + // Merge the output of `Self::state()` from other instances of this accumulator + // into this accumulator's state + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + // TODO(Matt): Finish this. + // if states.is_empty() { + // return Ok(()); + // } + // let arr = &states[0]; + // (0..arr.len()).try_for_each(|index| { + // let v = states + // .iter() + // .map(|array| ScalarValue::try_from_array(array, index)) + // .collect::>>()?; + // if let (ScalarValue::Float64(Some(prod)), ScalarValue::UInt32(Some(n))) = (&v[0], &v[1]) + // { + // self.prod *= prod; + // self.n += n; + // } else { + // unreachable!("") + // } + // Ok(()) + // }) + Ok(()) + } } diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs index 462a22247..de922d831 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs @@ -72,7 +72,7 @@ fn evaluate_bloom_filter( let bloom_filter_bytes = bloom_filter_expr.evaluate(&batch)?; match bloom_filter_bytes { ColumnarValue::Scalar(ScalarValue::Binary(v)) => { - Ok(v.map(|v| SparkBloomFilter::new(v.as_bytes()))) + Ok(v.map(|v| SparkBloomFilter::from(v.as_bytes()))) } _ => internal_err!("Bloom filter expression should be evaluated as a scalar binary value"), } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 8f907d000..c9601bdee 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1621,8 +1621,6 @@ impl PhysicalPlanner { Arc::clone(&num_bits), "bloom_filter_agg", datatype, - expr.mutable_buffer_offset, - expr.input_buffer_offset, )); Self::create_aggr_func_expr("bloom_filter_agg", schema, vec![child], func) } diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 19f00b200..02d9943fa 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -63,6 +63,10 @@ impl SparkBitArray { } } +pub fn num_words(num_bits: i32) -> i32 { + (num_bits as f64 / 64.0).ceil() as i32 +} + #[cfg(test)] mod test { use super::*; diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs index 00f717676..21a56dd27 100644 --- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs +++ b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::datafusion::util::spark_bit_array; use crate::execution::datafusion::util::spark_bit_array::SparkBitArray; use arrow_array::{ArrowNativeTypeOp, BooleanArray, Int64Array}; use datafusion_comet_spark_expr::spark_hash::spark_compatible_murmur3_hash; +use std::cmp; const SPARK_BLOOM_FILTER_VERSION_1: i32 = 1; @@ -30,8 +32,31 @@ pub struct SparkBloomFilter { num_hash_functions: u32, } -impl SparkBloomFilter { - pub fn new(buf: &[u8]) -> Self { +static DEFAULT_FPP: f64 = 0.03; + +pub fn optimal_num_hash_functions(expected_items: i32, num_bits: i32) -> i32 { + cmp::max( + 1, + ((num_bits as f64 / expected_items as f64) * 2.0_f64.ln()).round() as i32, + ) +} + +impl From<(i32, i32)> for SparkBloomFilter { + /// Creates an empty SparkBloomFilter given number of hash functions and bits. + fn from((num_hash_functions, num_bits): (i32, i32)) -> Self { + let num_words = spark_bit_array::num_words(num_bits); + let bits = vec![0u64; num_words as usize]; + Self { + bits: SparkBitArray::new(bits), + num_hash_functions: num_hash_functions as u32, + } + } +} + +impl From<&[u8]> for SparkBloomFilter { + /// Creates a SparkBloomFilter from a serialized byte array conforming to Spark's BloomFilter + /// binary format version 1. + fn from(buf: &[u8]) -> Self { let mut offset = 0; let version = read_num_be_bytes!(i32, 4, buf[offset..]); offset += 4; @@ -54,7 +79,9 @@ impl SparkBloomFilter { num_hash_functions: num_hash_functions as u32, } } +} +impl SparkBloomFilter { pub fn put_long(&mut self, item: i64) -> bool { // Here we first hash the input long element into 2 int hash values, h1 and h2, then produce // n hash values by `h1 + i * h2` with 1 <= i <= num_hash_functions. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 5e37461fc..0c1380d80 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -776,8 +776,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim numBitsExpr.isDefined && dataType.isDefined) { val bloomFilterAggBuilder = ExprOuterClass.BloomFilterAgg.newBuilder() bloomFilterAggBuilder.setChild(childExpr.get) - bloomFilterAggBuilder.setNumItems(childExpr.get) - bloomFilterAggBuilder.setNumBits(childExpr.get) + bloomFilterAggBuilder.setNumItems(numItemsExpr.get) + bloomFilterAggBuilder.setNumBits(numBitsExpr.get) bloomFilterAggBuilder.setMutableBufferOffset(mutableBufferOffset) bloomFilterAggBuilder.setInputBufferOffset(inputBufferOffset) bloomFilterAggBuilder.setDatatype(dataType.get) From 62e656cc56b3388174208426e2007535e0712968 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 30 Sep 2024 14:00:42 -0400 Subject: [PATCH 05/23] BloomFilterAgg state, merge_state, and evaluate. Need more tests. --- .../expressions/bloom_filter_agg.rs | 74 +++++-------------- .../datafusion/util/spark_bit_array.rs | 61 ++++++++++++++- .../datafusion/util/spark_bloom_filter.rs | 21 ++++++ 3 files changed, 99 insertions(+), 57 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 2609246c3..8c1256910 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -15,25 +15,20 @@ // specific language governing permissions and limitations // under the License. -use arrow_schema::{Field, Schema}; +use arrow_schema::Field; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; -use datafusion_physical_expr::NullState; use std::{any::Any, sync::Arc}; use crate::execution::datafusion::util::spark_bloom_filter; use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter; -use arrow::{ - array::{ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array}, - datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type}, - record_batch::RecordBatch, -}; +use arrow::array::ArrayRef; +use arrow_array::BinaryArray; use datafusion::error::Result; use datafusion::physical_expr::PhysicalExpr; -use datafusion::prelude::*; -use datafusion_common::{cast::as_float64_array, ScalarValue}; +use datafusion_common::{downcast_value, DataFusionError, ScalarValue}; use datafusion_expr::{ function::{AccumulatorArgs, StateFieldsArgs}, - Accumulator, AggregateUDF, AggregateUDFImpl, Signature, + Accumulator, AggregateUDFImpl, Signature, }; use datafusion_physical_expr::expressions::Literal; @@ -83,12 +78,10 @@ impl BloomFilterAgg { } impl AggregateUDFImpl for BloomFilterAgg { - /// We implement as_any so that we can downcast the AggregateUDFImpl trait object fn as_any(&self) -> &dyn Any { self } - /// Return the name of this function fn name(&self) -> &str { "bloom_filter_agg" } @@ -101,12 +94,6 @@ impl AggregateUDFImpl for BloomFilterAgg { Ok(DataType::Binary) } - /// This is the accumulator factory; DataFusion uses it to create new accumulators. - /// - /// This is the accumulator factory for row wise accumulation; Even when `GroupsAccumulator` - /// is supported, DataFusion will use this row oriented - /// accumulator when the aggregate function is used as a window function - /// or when there are only aggregates (no GROUP BY columns) in the plan. fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { Ok(Box::new(SparkBloomFilter::from(( spark_bloom_filter::optimal_num_hash_functions(self.num_items, self.num_bits), @@ -115,7 +102,7 @@ impl AggregateUDFImpl for BloomFilterAgg { } /// This is the description of the state. accumulator's state() must match the types here. - fn state_fields(&self, args: StateFieldsArgs) -> Result> { + fn state_fields(&self, _args: StateFieldsArgs) -> Result> { Ok(vec![Field::new("bits", DataType::Binary, false)]) } @@ -124,12 +111,7 @@ impl AggregateUDFImpl for BloomFilterAgg { } } -// UDAFs are built using the trait `Accumulator`, that offers DataFusion the necessary functions -// to use them. impl Accumulator for SparkBloomFilter { - // DataFusion calls this function to update the accumulator's state for a batch - // of inputs rows. In this case the product is updated with values from the first column - // and the count is updated based on the row count fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if values.is_empty() { return Ok(()); @@ -147,49 +129,29 @@ impl Accumulator for SparkBloomFilter { }) } - // DataFusion expects this function to return the final value of this aggregator. - // in this case, this is the formula of the geometric mean fn evaluate(&mut self) -> Result { - // TODO(Matt): Finish this. - // let value = self.prod.powf(1.0 / self.n as f64); - // Ok(ScalarValue::from(value)) - Ok(ScalarValue::from(0)) + // TODO(Matt): There's got to be a more efficient way to do this. + let mut spark_bloom_filter: Vec = 1_u32.to_be_bytes().to_vec(); + spark_bloom_filter.append(&mut self.num_hash_functions().to_be_bytes().to_vec()); + spark_bloom_filter.append(&mut (self.state_size_words() as u32).to_be_bytes().to_vec()); + spark_bloom_filter.append(&mut self.state_as_bytes()); + Ok(ScalarValue::Binary(Some(spark_bloom_filter))) } fn size(&self) -> usize { std::mem::size_of_val(self) } - // This function serializes our state to `ScalarValue`, which DataFusion uses - // to pass this state between execution stages. - // Note that this can be arbitrary data. fn state(&mut self) -> Result> { - // TODO(Matt): Finish this. - Ok(vec![ScalarValue::from(0), ScalarValue::from(0)]) + // TODO(Matt): There might be a more efficient way to do this. Right now it's deep copying + // SparkBitArray's Vec to Vec. I think ScalarValue then deep copies the Vec. + let state_sv = ScalarValue::Binary(Some(self.state_as_bytes())); + Ok(vec![state_sv]) } - // Merge the output of `Self::state()` from other instances of this accumulator - // into this accumulator's state fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - // TODO(Matt): Finish this. - // if states.is_empty() { - // return Ok(()); - // } - // let arr = &states[0]; - // (0..arr.len()).try_for_each(|index| { - // let v = states - // .iter() - // .map(|array| ScalarValue::try_from_array(array, index)) - // .collect::>>()?; - // if let (ScalarValue::Float64(Some(prod)), ScalarValue::UInt32(Some(n))) = (&v[0], &v[1]) - // { - // self.prod *= prod; - // self.n += n; - // } else { - // unreachable!("") - // } - // Ok(()) - // }) + let state_sv = downcast_value!(states[0], BinaryArray); + self.merge_filter(state_sv.value_data()); Ok(()) } } diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 02d9943fa..60eda4539 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::iter::zip; +use arrow_buffer::ToByteSlice; + /// A simple bit array implementation that simulates the behavior of Spark's BitArray which is /// used in the BloomFilter implementation. Some methods are not implemented as they are not /// required for the current use case. @@ -55,12 +58,35 @@ impl SparkBitArray { } pub fn bit_size(&self) -> u64 { - self.data.len() as u64 * 64 + self.word_size() as u64 * 64 + } + + pub fn byte_size(&self) -> usize { + self.word_size() * 8 + } + + pub fn word_size(&self) -> usize { + self.data.len() } pub fn cardinality(&self) -> usize { self.bit_count } + + pub fn to_bytes(&self) -> Vec { + Vec::from(self.data.to_byte_slice()) + } + + pub fn to_bytes_not_vec(&self) -> &[u8] { + self.data.to_byte_slice() + } + + pub fn merge_bits(&mut self, other: &[u8]) { + assert_eq!(self.byte_size(), other.len()); + for i in zip(self.data.iter_mut(), other.chunks(8).map(|chunk| u64::from_ne_bytes(chunk.try_into().unwrap()))) { + *i.0 = *i.0 | i.1; + }; + } } pub fn num_words(num_bits: i32) -> i32 { @@ -158,4 +184,37 @@ mod test { assert!(array.get(n)); } } + + #[test] + fn test_spark_bit_merge() { + let buf1 = vec![0u64; 4]; + let mut array1 = SparkBitArray::new(buf1); + let buf2 = vec![0u64; 4]; + let mut array2 = SparkBitArray::new(buf2); + + + let primes = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251]; + let fibs = [1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233]; + + for n in fibs { + array1.set(n); + } + + for n in primes { + array2.set(n); + } + + assert_eq!(array1.cardinality(), fibs.len()); + assert_eq!(array2.cardinality(), primes.len()); + + array1.merge_bits(array2.to_bytes_not_vec()); + + for n in fibs { + assert!(array1.get(n)); + } + + for n in primes { + assert!(array1.get(n)); + } + } } diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs index 21a56dd27..5308d3d63 100644 --- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs +++ b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs @@ -121,4 +121,25 @@ impl SparkBloomFilter { .map(|v| v.map(|x| self.might_contain_long(x))) .collect() } + + pub fn state_as_bytes(&self) -> Vec { + self.bits.to_bytes() + } + + pub fn state_size_words(&self) -> usize { + self.bits.word_size() + } + + pub fn num_hash_functions(&self) -> u32 { + self.num_hash_functions + } + + pub fn merge_filter(&mut self, other: &[u8]) { + assert_eq!( + other.len(), + self.bits.byte_size(), + "Cannot merge SparkBloomFilters with different lengths." + ); + self.bits.merge_bits(other); + } } From 33ef47dc98ae63d6f84a33ec24e506161b5bfd00 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 30 Sep 2024 14:31:12 -0400 Subject: [PATCH 06/23] Matches Spark behavior. Need to clean up the code quite a bit, and do `cargo clippy`. --- .../expressions/bloom_filter_agg.rs | 8 ++++++- .../datafusion/util/spark_bit_array.rs | 22 ++++++++++++++----- .../datafusion/util/spark_bloom_filter.rs | 4 ++++ .../apache/comet/exec/CometExecSuite.scala | 2 +- 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 8c1256910..6b913ebce 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -23,6 +23,7 @@ use crate::execution::datafusion::util::spark_bloom_filter; use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter; use arrow::array::ArrayRef; use arrow_array::BinaryArray; +use arrow_buffer::ToByteSlice; use datafusion::error::Result; use datafusion::physical_expr::PhysicalExpr; use datafusion_common::{downcast_value, DataFusionError, ScalarValue}; @@ -134,7 +135,12 @@ impl Accumulator for SparkBloomFilter { let mut spark_bloom_filter: Vec = 1_u32.to_be_bytes().to_vec(); spark_bloom_filter.append(&mut self.num_hash_functions().to_be_bytes().to_vec()); spark_bloom_filter.append(&mut (self.state_size_words() as u32).to_be_bytes().to_vec()); - spark_bloom_filter.append(&mut self.state_as_bytes()); + let mut filter_state: Vec = self.bits_state(); + for i in 0..filter_state.len() { + filter_state[i] = filter_state[i].to_be(); + } + // TODO(Matt): Flip the endianness of 64-bit words. + spark_bloom_filter.append(&mut Vec::from(filter_state.to_byte_slice())); Ok(ScalarValue::Binary(Some(spark_bloom_filter))) } diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 60eda4539..4d9321f20 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::iter::zip; use arrow_buffer::ToByteSlice; +use std::iter::zip; /// A simple bit array implementation that simulates the behavior of Spark's BitArray which is /// used in the BloomFilter implementation. Some methods are not implemented as they are not @@ -81,11 +81,20 @@ impl SparkBitArray { self.data.to_byte_slice() } + pub fn data(&self) -> Vec { + self.data.clone() + } + pub fn merge_bits(&mut self, other: &[u8]) { assert_eq!(self.byte_size(), other.len()); - for i in zip(self.data.iter_mut(), other.chunks(8).map(|chunk| u64::from_ne_bytes(chunk.try_into().unwrap()))) { + for i in zip( + self.data.iter_mut(), + other + .chunks(8) + .map(|chunk| u64::from_ne_bytes(chunk.try_into().unwrap())), + ) { *i.0 = *i.0 | i.1; - }; + } } } @@ -192,8 +201,11 @@ mod test { let buf2 = vec![0u64; 4]; let mut array2 = SparkBitArray::new(buf2); - - let primes = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251]; + let primes = [ + 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, + 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, + 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, + ]; let fibs = [1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233]; for n in fibs { diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs index 5308d3d63..d79cd5280 100644 --- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs +++ b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs @@ -126,6 +126,10 @@ impl SparkBloomFilter { self.bits.to_bytes() } + pub fn bits_state(&self) -> Vec { + self.bits.data() + } + pub fn state_size_words(&self) -> usize { self.bits.word_size() } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 78f59cbea..92dc6565b 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -927,7 +927,7 @@ class CometExecSuite extends CometTestBase { (0 until 100) .map(_ => (Random.nextInt(), Random.nextInt() % 5)), "tbl") { - val df = sql("SELECT bloom_filter_agg(cast(_2 as long)) FROM tbl") + val df = sql("SELECT bloom_filter_agg(cast(_2 as long), cast(10 as long)) FROM tbl") checkSparkAnswerAndOperator(df) } From 599a8f9664bc31d8a7a3d82420592402196634e8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 30 Sep 2024 14:33:17 -0400 Subject: [PATCH 07/23] Remove old comment. --- .../src/execution/datafusion/expressions/bloom_filter_agg.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 6b913ebce..43ed5f0d1 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -139,7 +139,6 @@ impl Accumulator for SparkBloomFilter { for i in 0..filter_state.len() { filter_state[i] = filter_state[i].to_be(); } - // TODO(Matt): Flip the endianness of 64-bit words. spark_bloom_filter.append(&mut Vec::from(filter_state.to_byte_slice())); Ok(ScalarValue::Binary(Some(spark_bloom_filter))) } From a2a8cf358699b87e197f0c2e50f8dc315624335e Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 30 Sep 2024 16:02:29 -0400 Subject: [PATCH 08/23] Clippy. Increase bloom filter size back to Spark's default. --- .../expressions/bloom_filter_agg.rs | 34 +++++++------------ .../datafusion/util/spark_bit_array.rs | 2 +- .../apache/comet/exec/CometExecSuite.scala | 4 +-- 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 43ed5f0d1..3da44dd2e 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -42,6 +42,15 @@ pub struct BloomFilterAgg { num_bits: i32, } +fn i32_from_literal_physical_expr(expr: Arc) -> i32 { + match expr.as_any().downcast_ref::().unwrap().value() { + ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32, + _ => { + unreachable!() + } + } +} + impl BloomFilterAgg { pub fn new( expr: Arc, @@ -51,29 +60,12 @@ impl BloomFilterAgg { data_type: DataType, ) -> Self { assert!(matches!(data_type, DataType::Binary)); - let num_items = match num_items - .as_any() - .downcast_ref::() - .unwrap() - .value() - { - ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32, - _ => { - unreachable!() - } - }; - let num_bits = match num_bits.as_any().downcast_ref::().unwrap().value() { - ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32, - _ => { - unreachable!() - } - }; Self { name: name.into(), signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable), expr, - num_items: num_items, - num_bits: num_bits, + num_items: i32_from_literal_physical_expr(num_items), + num_bits: i32_from_literal_physical_expr(num_bits), } } } @@ -136,8 +128,8 @@ impl Accumulator for SparkBloomFilter { spark_bloom_filter.append(&mut self.num_hash_functions().to_be_bytes().to_vec()); spark_bloom_filter.append(&mut (self.state_size_words() as u32).to_be_bytes().to_vec()); let mut filter_state: Vec = self.bits_state(); - for i in 0..filter_state.len() { - filter_state[i] = filter_state[i].to_be(); + for i in filter_state.iter_mut() { + *i = i.to_be(); } spark_bloom_filter.append(&mut Vec::from(filter_state.to_byte_slice())); Ok(ScalarValue::Binary(Some(spark_bloom_filter))) diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 4d9321f20..8f42026c3 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -93,7 +93,7 @@ impl SparkBitArray { .chunks(8) .map(|chunk| u64::from_ne_bytes(chunk.try_into().unwrap())), ) { - *i.0 = *i.0 | i.1; + *i.0 |= i.1; } } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 92dc6565b..185cc0818 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -109,7 +109,7 @@ class CometExecSuite extends CometTestBase { sql( "CREATE VIEW lv_noalias AS SELECT myTab.* FROM src " + "LATERAL VIEW explode(map('key1', 100, 'key2', 200)) myTab LIMIT 2") - val df = sql("SELECT * FROM lv_noalias a JOIN lv_noalias b ON a.key=b.key"); + val df = sql("SELECT * FROM lv_noalias a JOIN lv_noalias b ON a.key=b.key") checkSparkAnswer(df) } } @@ -927,7 +927,7 @@ class CometExecSuite extends CometTestBase { (0 until 100) .map(_ => (Random.nextInt(), Random.nextInt() % 5)), "tbl") { - val df = sql("SELECT bloom_filter_agg(cast(_2 as long), cast(10 as long)) FROM tbl") + val df = sql("SELECT bloom_filter_agg(cast(_2 as long)) FROM tbl") checkSparkAnswerAndOperator(df) } From 22aedd9f4ef12816b8a6cd603361f7ef3ef5c950 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 30 Sep 2024 18:05:23 -0400 Subject: [PATCH 09/23] API cleanup. --- .../expressions/bloom_filter_agg.rs | 23 ++++----------- .../datafusion/util/spark_bloom_filter.rs | 29 +++++++++++-------- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 3da44dd2e..4590bcd15 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -23,7 +23,6 @@ use crate::execution::datafusion::util::spark_bloom_filter; use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter; use arrow::array::ArrayRef; use arrow_array::BinaryArray; -use arrow_buffer::ToByteSlice; use datafusion::error::Result; use datafusion::physical_expr::PhysicalExpr; use datafusion_common::{downcast_value, DataFusionError, ScalarValue}; @@ -42,7 +41,7 @@ pub struct BloomFilterAgg { num_bits: i32, } -fn i32_from_literal_physical_expr(expr: Arc) -> i32 { +fn extract_i32_from_literal(expr: Arc) -> i32 { match expr.as_any().downcast_ref::().unwrap().value() { ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32, _ => { @@ -64,8 +63,8 @@ impl BloomFilterAgg { name: name.into(), signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable), expr, - num_items: i32_from_literal_physical_expr(num_items), - num_bits: i32_from_literal_physical_expr(num_bits), + num_items: extract_i32_from_literal(num_items), + num_bits: extract_i32_from_literal(num_bits), } } } @@ -94,7 +93,6 @@ impl AggregateUDFImpl for BloomFilterAgg { )))) } - /// This is the description of the state. accumulator's state() must match the types here. fn state_fields(&self, _args: StateFieldsArgs) -> Result> { Ok(vec![Field::new("bits", DataType::Binary, false)]) } @@ -123,16 +121,7 @@ impl Accumulator for SparkBloomFilter { } fn evaluate(&mut self) -> Result { - // TODO(Matt): There's got to be a more efficient way to do this. - let mut spark_bloom_filter: Vec = 1_u32.to_be_bytes().to_vec(); - spark_bloom_filter.append(&mut self.num_hash_functions().to_be_bytes().to_vec()); - spark_bloom_filter.append(&mut (self.state_size_words() as u32).to_be_bytes().to_vec()); - let mut filter_state: Vec = self.bits_state(); - for i in filter_state.iter_mut() { - *i = i.to_be(); - } - spark_bloom_filter.append(&mut Vec::from(filter_state.to_byte_slice())); - Ok(ScalarValue::Binary(Some(spark_bloom_filter))) + Ok(ScalarValue::Binary(Some(self.spark_serialization()))) } fn size(&self) -> usize { @@ -140,8 +129,8 @@ impl Accumulator for SparkBloomFilter { } fn state(&mut self) -> Result> { - // TODO(Matt): There might be a more efficient way to do this. Right now it's deep copying - // SparkBitArray's Vec to Vec. I think ScalarValue then deep copies the Vec. + // TODO(Matt): There might be a more efficient way to do this by transmuting since calling + // state() on an Accumulator is considered destructive. let state_sv = ScalarValue::Binary(Some(self.state_as_bytes())); Ok(vec![state_sv]) } diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs index d79cd5280..bd48ab83b 100644 --- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs +++ b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs @@ -18,6 +18,7 @@ use crate::execution::datafusion::util::spark_bit_array; use crate::execution::datafusion::util::spark_bit_array::SparkBitArray; use arrow_array::{ArrowNativeTypeOp, BooleanArray, Int64Array}; +use arrow_buffer::ToByteSlice; use datafusion_comet_spark_expr::spark_hash::spark_compatible_murmur3_hash; use std::cmp; @@ -82,6 +83,22 @@ impl From<&[u8]> for SparkBloomFilter { } impl SparkBloomFilter { + /// Serializes a SparkBloomFilter to a byte array conforming to Spark's BloomFilter + /// binary format version 1. + pub fn spark_serialization(&self) -> Vec { + // TODO(Matt): There must be a more efficient way to do this, even with all of the + // endianness stuff. + let mut spark_bloom_filter: Vec = 1_u32.to_be_bytes().to_vec(); + spark_bloom_filter.append(&mut self.num_hash_functions.to_be_bytes().to_vec()); + spark_bloom_filter.append(&mut (self.bits.word_size() as u32).to_be_bytes().to_vec()); + let mut filter_state: Vec = self.bits.data(); + for i in filter_state.iter_mut() { + *i = i.to_be(); + } + spark_bloom_filter.append(&mut Vec::from(filter_state.to_byte_slice())); + spark_bloom_filter + } + pub fn put_long(&mut self, item: i64) -> bool { // Here we first hash the input long element into 2 int hash values, h1 and h2, then produce // n hash values by `h1 + i * h2` with 1 <= i <= num_hash_functions. @@ -126,18 +143,6 @@ impl SparkBloomFilter { self.bits.to_bytes() } - pub fn bits_state(&self) -> Vec { - self.bits.data() - } - - pub fn state_size_words(&self) -> usize { - self.bits.word_size() - } - - pub fn num_hash_functions(&self) -> u32 { - self.num_hash_functions - } - pub fn merge_filter(&mut self, other: &[u8]) { assert_eq!( other.len(), From bf2290254390166fbfa92052a2d02d4a2bd6a5d3 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 1 Oct 2024 08:09:21 -0400 Subject: [PATCH 10/23] API cleanup. --- native/proto/src/proto/expr.proto | 4 +--- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 10 +--------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 1ae421c3e..c57de152d 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -196,9 +196,7 @@ message BloomFilterAgg { Expr child = 1; Expr numItems = 2; Expr numBits = 3; - int32 mutableBufferOffset = 4; - int32 inputBufferOffset = 5; - DataType datatype = 6; + DataType datatype = 4; } message Literal { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0c1380d80..bed5877b4 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -761,12 +761,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case bloom_filter @ BloomFilterAggregate( - child, - numItems, - numBits, - mutableBufferOffset, - inputBufferOffset) => { + case bloom_filter @ BloomFilterAggregate(child, numItems, numBits, _, _) => val childExpr = exprToProto(child, inputs, binding) val numItemsExpr = exprToProto(numItems, inputs, binding) val numBitsExpr = exprToProto(numBits, inputs, binding) @@ -778,8 +773,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim bloomFilterAggBuilder.setChild(childExpr.get) bloomFilterAggBuilder.setNumItems(numItemsExpr.get) bloomFilterAggBuilder.setNumBits(numBitsExpr.get) - bloomFilterAggBuilder.setMutableBufferOffset(mutableBufferOffset) - bloomFilterAggBuilder.setInputBufferOffset(inputBufferOffset) bloomFilterAggBuilder.setDatatype(dataType.get) Some( @@ -791,7 +784,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(aggExpr, child, numItems, numBits) None } - } case fn => val msg = s"unsupported Spark aggregate function: ${fn.prettyName}" From 88adc7511be83100c09c36c55f47993d058d9063 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 10:15:01 -0400 Subject: [PATCH 11/23] Add BloomFilterAgg benchmark to CometExecBenchmark --- .../sql/benchmark/CometExecBenchmark.scala | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala index 7205484e5..982317851 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala @@ -22,6 +22,9 @@ package org.apache.spark.sql.benchmark import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate import org.apache.spark.sql.internal.SQLConf import org.apache.comet.{CometConf, CometSparkSessionExtensions} @@ -222,6 +225,59 @@ object CometExecBenchmark extends CometBenchmarkBase { } } + // BloomFilterAgg takes an argument for the expected number of distinct values, which determines filter size and + // number of hash functions. We use the cardinality as a hint to the aggregate, otherwise the default Spark values + // make a big filter with a lot of hash functions. + def bloomFilterAggregate(values: Int, cardinality: Int): Unit = { + val benchmark = + new Benchmark( + s"BloomFilterAggregate Exec (cardinality $cardinality)", + values, + output = output) + + val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg") + spark.sessionState.functionRegistry.registerFunction( + funcId_bloom_filter_agg, + new ExpressionInfo(classOf[BloomFilterAggregate].getName, "bloom_filter_agg"), + (children: Seq[Expression]) => + children.size match { + case 1 => new BloomFilterAggregate(children.head) + case 2 => new BloomFilterAggregate(children.head, children(1)) + case 3 => new BloomFilterAggregate(children.head, children(1), children(2)) + }) + + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable(dir, spark.sql(s"SELECT floor(rand() * $cardinality) as key FROM $tbl")) + + val query = + s"SELECT bloom_filter_agg(cast(key as long), cast($cardinality as long)) FROM parquetV1Table" + + benchmark.addCase("SQL Parquet - Spark (BloomFilterAgg)") { _ => + spark.sql(query).noop() + } + + benchmark.addCase("SQL Parquet - Comet (Scan) (BloomFilterAgg)") { _ => + withSQLConf(CometConf.COMET_ENABLED.key -> "true") { + spark.sql(query).noop() + } + } + + benchmark.addCase("SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + spark.sql(query).noop() + } + } + + benchmark.run() + } + } + + spark.sessionState.functionRegistry.dropFunction(funcId_bloom_filter_agg) + } + override def runCometBenchmark(mainArgs: Array[String]): Unit = { runBenchmarkWithTable("Subquery", 1024 * 1024 * 10) { v => subqueryExecBenchmark(v) @@ -240,5 +296,11 @@ object CometExecBenchmark extends CometBenchmarkBase { runBenchmarkWithTable("Sort", 1024 * 1024 * 10) { v => sortExecBenchmark(v) } + + runBenchmarkWithTable("BloomFilterAggregate", 1024 * 1024 * 10) { v => + for (card <- List(100, 1024, 1024 * 1024)) { + bloomFilterAggregate(v, card) + } + } } } From a21e0e362a85b9a5376d4aa034034646a169da0b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 10:31:50 -0400 Subject: [PATCH 12/23] Docs. --- .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 2 ++ spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index bed5877b4..59795ffca 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -762,6 +762,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case bloom_filter @ BloomFilterAggregate(child, numItems, numBits, _, _) => + // We ignore mutableAggBufferOffset and inputAggBufferOffset because they are + // implementation details for Spark's ObjectHashAggregate. val childExpr = exprToProto(child, inputs, binding) val numItemsExpr = exprToProto(numItems, inputs, binding) val numBitsExpr = exprToProto(numBits, inputs, binding) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 185cc0818..78f59cbea 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -109,7 +109,7 @@ class CometExecSuite extends CometTestBase { sql( "CREATE VIEW lv_noalias AS SELECT myTab.* FROM src " + "LATERAL VIEW explode(map('key1', 100, 'key2', 200)) myTab LIMIT 2") - val df = sql("SELECT * FROM lv_noalias a JOIN lv_noalias b ON a.key=b.key") + val df = sql("SELECT * FROM lv_noalias a JOIN lv_noalias b ON a.key=b.key"); checkSparkAnswer(df) } } From 5c5d0f905062815fd3e861cd8f221d0282e52dce Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 10:58:54 -0400 Subject: [PATCH 13/23] API cleanup, fix merge_bits to update cardinality. --- .../core/src/execution/datafusion/util/spark_bit_array.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 8f42026c3..4dc1e8061 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -77,10 +77,6 @@ impl SparkBitArray { Vec::from(self.data.to_byte_slice()) } - pub fn to_bytes_not_vec(&self) -> &[u8] { - self.data.to_byte_slice() - } - pub fn data(&self) -> Vec { self.data.clone() } @@ -95,6 +91,7 @@ impl SparkBitArray { ) { *i.0 |= i.1; } + self.bit_count = self.data.iter().map(|x| x.count_ones() as usize).sum(); } } @@ -219,7 +216,7 @@ mod test { assert_eq!(array1.cardinality(), fibs.len()); assert_eq!(array2.cardinality(), primes.len()); - array1.merge_bits(array2.to_bytes_not_vec()); + array1.merge_bits(array2.to_bytes().as_slice()); for n in fibs { assert!(array1.get(n)); @@ -228,5 +225,6 @@ mod test { for n in primes { assert!(array1.get(n)); } + assert_eq!(array1.cardinality(), 60); } } From cd107e32db596ce4aabd97adca9faa86b4d434fd Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 11:01:55 -0400 Subject: [PATCH 14/23] Refactor merge_bits to update bit_count with the bit merging. --- .../datafusion/util/spark_bit_array.rs | 4 +++- .../CometExecBenchmark-jdk11-results.txt | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 spark/benchmarks/CometExecBenchmark-jdk11-results.txt diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 4dc1e8061..7cb68f1f0 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -83,6 +83,7 @@ impl SparkBitArray { pub fn merge_bits(&mut self, other: &[u8]) { assert_eq!(self.byte_size(), other.len()); + let mut bit_count: usize = 0; for i in zip( self.data.iter_mut(), other @@ -90,8 +91,9 @@ impl SparkBitArray { .map(|chunk| u64::from_ne_bytes(chunk.try_into().unwrap())), ) { *i.0 |= i.1; + bit_count += i.0.count_ones() as usize; } - self.bit_count = self.data.iter().map(|x| x.count_ones() as usize).sum(); + self.bit_count = bit_count; } } diff --git a/spark/benchmarks/CometExecBenchmark-jdk11-results.txt b/spark/benchmarks/CometExecBenchmark-jdk11-results.txt new file mode 100644 index 000000000..e3a2c520b --- /dev/null +++ b/spark/benchmarks/CometExecBenchmark-jdk11-results.txt @@ -0,0 +1,17 @@ +================================================================================================ +Subquery +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.24+8-LTS on Mac OS X 15.0 +Apple M3 Max +Subquery: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +SQL Parquet - Spark 0 0 0 28849.9 0.0 1.0X +SQL Parquet - Comet (Scan) 0 0 0 30716.3 0.0 1.1X +SQL Parquet - Comet (Scan, Exec) 0 0 0 30218.3 0.0 1.0X + + +================================================================================================ +Expand +================================================================================================ + From 4f06098f301bbc4b1ff15d45e7e9a5793056db86 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 11:18:29 -0400 Subject: [PATCH 15/23] Remove benchmark results file. --- .../CometExecBenchmark-jdk11-results.txt | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 spark/benchmarks/CometExecBenchmark-jdk11-results.txt diff --git a/spark/benchmarks/CometExecBenchmark-jdk11-results.txt b/spark/benchmarks/CometExecBenchmark-jdk11-results.txt deleted file mode 100644 index e3a2c520b..000000000 --- a/spark/benchmarks/CometExecBenchmark-jdk11-results.txt +++ /dev/null @@ -1,17 +0,0 @@ -================================================================================================ -Subquery -================================================================================================ - -OpenJDK 64-Bit Server VM 11.0.24+8-LTS on Mac OS X 15.0 -Apple M3 Max -Subquery: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark 0 0 0 28849.9 0.0 1.0X -SQL Parquet - Comet (Scan) 0 0 0 30716.3 0.0 1.1X -SQL Parquet - Comet (Scan, Exec) 0 0 0 30218.3 0.0 1.0X - - -================================================================================================ -Expand -================================================================================================ - From 79f64681a41d24a071eab3a804a33b8dcf0ee5dc Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 11:47:30 -0400 Subject: [PATCH 16/23] Docs. --- .../src/execution/datafusion/expressions/bloom_filter_agg.rs | 4 ++-- native/core/src/execution/datafusion/util/spark_bit_array.rs | 3 +++ .../core/src/execution/datafusion/util/spark_bloom_filter.rs | 5 +++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 4590bcd15..85c22102d 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -129,8 +129,8 @@ impl Accumulator for SparkBloomFilter { } fn state(&mut self) -> Result> { - // TODO(Matt): There might be a more efficient way to do this by transmuting since calling - // state() on an Accumulator is considered destructive. + // There might be a more efficient way to do this by transmuting since calling state() on an + // Accumulator is considered destructive. let state_sv = ScalarValue::Binary(Some(self.state_as_bytes())); Ok(vec![state_sv]) } diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 7cb68f1f0..31a28d7c9 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -81,9 +81,12 @@ impl SparkBitArray { self.data.clone() } + // Combines SparkBitArrays, however other is a &[u8] because we anticipate to come from an + // Arrow ScalarValue::Binary which is a byte vector underneath, rather than a word vector. pub fn merge_bits(&mut self, other: &[u8]) { assert_eq!(self.byte_size(), other.len()); let mut bit_count: usize = 0; + // For each word, merge the bits into self, and accumulate a new bit_count. for i in zip( self.data.iter_mut(), other diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs index bd48ab83b..c5471d649 100644 --- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs +++ b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs @@ -86,8 +86,7 @@ impl SparkBloomFilter { /// Serializes a SparkBloomFilter to a byte array conforming to Spark's BloomFilter /// binary format version 1. pub fn spark_serialization(&self) -> Vec { - // TODO(Matt): There must be a more efficient way to do this, even with all of the - // endianness stuff. + // There might be a more efficient way to do this, even with all the endianness stuff. let mut spark_bloom_filter: Vec = 1_u32.to_be_bytes().to_vec(); spark_bloom_filter.append(&mut self.num_hash_functions.to_be_bytes().to_vec()); spark_bloom_filter.append(&mut (self.bits.word_size() as u32).to_be_bytes().to_vec()); @@ -95,6 +94,8 @@ impl SparkBloomFilter { for i in filter_state.iter_mut() { *i = i.to_be(); } + // Does it make sense to do a std::mem::take of filter_state here? Unclear to me if a deep + // copy of filter_state as a Vec to a Vec is happening here. spark_bloom_filter.append(&mut Vec::from(filter_state.to_byte_slice())); spark_bloom_filter } From 57fe74202227432a3f27a46ac05f8dd7de0b59bd Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 17:21:01 -0400 Subject: [PATCH 17/23] Add native side benchmarks. --- native/core/Cargo.toml | 4 + native/core/benches/bloom_filter_agg.rs | 160 ++++++++++++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 native/core/benches/bloom_filter_agg.rs diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 13f6b135f..af93e5abd 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -126,3 +126,7 @@ harness = false [[bench]] name = "aggregate" harness = false + +[[bench]] +name = "bloom_filter_agg" +harness = false diff --git a/native/core/benches/bloom_filter_agg.rs b/native/core/benches/bloom_filter_agg.rs new file mode 100644 index 000000000..15d3694a9 --- /dev/null +++ b/native/core/benches/bloom_filter_agg.rs @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; + +use arrow::datatypes::{DataType, Field, Schema}; +use arrow_array::builder::Int64Builder; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::SchemaRef; +use comet::execution::datafusion::expressions::bloom_filter_agg::BloomFilterAgg; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::ScalarValue; +use datafusion_execution::TaskContext; +use datafusion_expr::AggregateUDF; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; +use datafusion_physical_expr::expressions::{Column, Literal}; +use futures::StreamExt; +use std::sync::Arc; +use std::time::Duration; +use tokio::runtime::Runtime; + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("bloom_filter_agg"); + let num_rows = 8192; + let batch = create_record_batch(num_rows); + let mut batches = Vec::new(); + for _ in 0..10 { + batches.push(batch.clone()); + } + let partitions = &[batches]; + let c0: Arc = Arc::new(Column::new("c0", 0)); + let num_items_sv = ScalarValue::Int64(Some(10 * num_rows as i64)); + let num_items: Arc = Arc::new(Literal::new(num_items_sv)); + let num_bits_sv = ScalarValue::Int64(Some((10 * num_rows * 8) as i64)); + let num_bits: Arc = Arc::new(Literal::new(num_bits_sv)); + + let rt = Runtime::new().unwrap(); + + for agg_mode in [ + ("partial_agg", AggregateMode::Partial), + ("single_agg", AggregateMode::Single), + ] { + group.bench_function(agg_mode.0, |b| { + let comet_bloom_filter_agg = + Arc::new(AggregateUDF::new_from_impl(BloomFilterAgg::new( + Arc::clone(&c0), + Arc::clone(&num_items), + Arc::clone(&num_bits), + "bloom_filter_agg", + DataType::Binary, + ))); + b.to_async(&rt).iter(|| { + black_box(agg_test( + partitions, + c0.clone(), + comet_bloom_filter_agg.clone(), + "bloom_filter_agg", + agg_mode.1, + )) + }) + }); + } + + group.finish(); +} + +async fn agg_test( + partitions: &[Vec], + c0: Arc, + aggregate_udf: Arc, + alias: &str, + mode: AggregateMode, +) { + let schema = &partitions[0][0].schema(); + let scan: Arc = + Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap()); + let aggregate = create_aggregate(scan, c0.clone(), schema, aggregate_udf, alias, mode); + let mut stream = aggregate + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + while let Some(batch) = stream.next().await { + let _batch = batch.unwrap(); + } +} + +fn create_aggregate( + scan: Arc, + c0: Arc, + schema: &SchemaRef, + aggregate_udf: Arc, + alias: &str, + mode: AggregateMode, +) -> Arc { + let aggr_expr = AggregateExprBuilder::new(aggregate_udf, vec![c0.clone()]) + .schema(schema.clone()) + .alias(alias) + .with_ignore_nulls(false) + .with_distinct(false) + .build() + .unwrap(); + + Arc::new( + AggregateExec::try_new( + mode, + PhysicalGroupBy::new_single(vec![]), + vec![aggr_expr], + vec![None], + scan, + Arc::clone(schema), + ) + .unwrap(), + ) +} + +fn create_record_batch(num_rows: usize) -> RecordBatch { + let mut int64_builder = Int64Builder::with_capacity(num_rows); + for i in 0..num_rows { + int64_builder.append_value(i as i64); + } + let int64_array = Arc::new(int64_builder.finish()); + + let mut fields = vec![]; + let mut columns: Vec = vec![]; + + // int64 column + fields.push(Field::new("c0", DataType::Int64, false)); + columns.push(int64_array); + + let schema = Schema::new(fields); + RecordBatch::try_new(Arc::new(schema), columns).unwrap() +} + +fn config() -> Criterion { + Criterion::default() + .measurement_time(Duration::from_millis(500)) + .warm_up_time(Duration::from_millis(500)) +} + +criterion_group! { + name = benches; + config = config(); + targets = criterion_benchmark +} +criterion_main!(benches); From ec64e4c4e004fde9ef4db5900654102c2f224624 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 18:25:35 -0400 Subject: [PATCH 18/23] Adjust benchmark parameters to match Spark defaults. --- native/core/benches/bloom_filter_agg.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/native/core/benches/bloom_filter_agg.rs b/native/core/benches/bloom_filter_agg.rs index 15d3694a9..90e3e3f64 100644 --- a/native/core/benches/bloom_filter_agg.rs +++ b/native/core/benches/bloom_filter_agg.rs @@ -45,9 +45,11 @@ fn criterion_benchmark(c: &mut Criterion) { } let partitions = &[batches]; let c0: Arc = Arc::new(Column::new("c0", 0)); - let num_items_sv = ScalarValue::Int64(Some(10 * num_rows as i64)); + // spark.sql.optimizer.runtime.bloomFilter.expectedNumItems + let num_items_sv = ScalarValue::Int64(Some(1000000_i64)); let num_items: Arc = Arc::new(Literal::new(num_items_sv)); - let num_bits_sv = ScalarValue::Int64(Some((10 * num_rows * 8) as i64)); + //spark.sql.optimizer.runtime.bloomFilter.numBits + let num_bits_sv = ScalarValue::Int64(Some(8388608_i64)); let num_bits: Arc = Arc::new(Literal::new(num_bits_sv)); let rt = Runtime::new().unwrap(); From 7a81f3593dd0c52d637f7af9d9e96346e18f6ab3 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 2 Oct 2024 19:06:47 -0400 Subject: [PATCH 19/23] Address review feedback. --- .../src/execution/datafusion/expressions/bloom_filter_agg.rs | 2 +- native/core/src/execution/datafusion/util/spark_bit_array.rs | 3 ++- .../core/src/execution/datafusion/util/spark_bloom_filter.rs | 2 -- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 85c22102d..f1a102091 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -114,7 +114,7 @@ impl Accumulator for SparkBloomFilter { if let ScalarValue::Int64(Some(value)) = v { self.put_long(value); } else { - unreachable!("") + unreachable!() } Ok(()) }) diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 31a28d7c9..68b97d660 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::common::bit; use arrow_buffer::ToByteSlice; use std::iter::zip; @@ -101,7 +102,7 @@ impl SparkBitArray { } pub fn num_words(num_bits: i32) -> i32 { - (num_bits as f64 / 64.0).ceil() as i32 + bit::ceil(num_bits as usize, 64) as i32 } #[cfg(test)] diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs index c5471d649..22a84d854 100644 --- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs +++ b/native/core/src/execution/datafusion/util/spark_bloom_filter.rs @@ -33,8 +33,6 @@ pub struct SparkBloomFilter { num_hash_functions: u32, } -static DEFAULT_FPP: f64 = 0.03; - pub fn optimal_num_hash_functions(expected_items: i32, num_bits: i32) -> i32 { cmp::max( 1, From 3347923502205038008645ec6bbd7ac7df149bad Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 4 Oct 2024 09:55:57 -0400 Subject: [PATCH 20/23] Add assertion to merge_batch. --- .../execution/datafusion/expressions/bloom_filter_agg.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index f1a102091..682895a92 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -136,6 +136,13 @@ impl Accumulator for SparkBloomFilter { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + assert_eq!( + states.len(), + 1, + "Expect one element in 'states' but found {}", + states.len() + ); + assert_eq!(states[0].len(), 1); let state_sv = downcast_value!(states[0], BinaryArray); self.merge_filter(state_sv.value_data()); Ok(()) From 1ed99e339c5bb943e7e8c9dd68961b7f1b8e3df8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 17 Oct 2024 12:48:06 -0400 Subject: [PATCH 21/23] Address some review feedback. --- .../expressions/bloom_filter_agg.rs | 1 + .../sql/benchmark/CometExecBenchmark.scala | 41 ++++++++----------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index 682895a92..ed64b80e7 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -41,6 +41,7 @@ pub struct BloomFilterAgg { num_bits: i32, } +#[inline] fn extract_i32_from_literal(expr: Arc) -> i32 { match expr.as_any().downcast_ref::().unwrap().value() { ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32, diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala index 982317851..3dd930f67 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala @@ -239,12 +239,7 @@ object CometExecBenchmark extends CometBenchmarkBase { spark.sessionState.functionRegistry.registerFunction( funcId_bloom_filter_agg, new ExpressionInfo(classOf[BloomFilterAggregate].getName, "bloom_filter_agg"), - (children: Seq[Expression]) => - children.size match { - case 1 => new BloomFilterAggregate(children.head) - case 2 => new BloomFilterAggregate(children.head, children(1)) - case 3 => new BloomFilterAggregate(children.head, children(1), children(2)) - }) + (children: Seq[Expression]) => new BloomFilterAggregate(children.head, children(1))) withTempPath { dir => withTempTable("parquetV1Table") { @@ -279,23 +274,23 @@ object CometExecBenchmark extends CometBenchmarkBase { } override def runCometBenchmark(mainArgs: Array[String]): Unit = { - runBenchmarkWithTable("Subquery", 1024 * 1024 * 10) { v => - subqueryExecBenchmark(v) - } - - runBenchmarkWithTable("Expand", 1024 * 1024 * 10) { v => - expandExecBenchmark(v) - } - - runBenchmarkWithTable("Project + Filter", 1024 * 1024 * 10) { v => - for (fractionOfZeros <- List(0.0, 0.50, 0.95)) { - numericFilterExecBenchmark(v, fractionOfZeros) - } - } - - runBenchmarkWithTable("Sort", 1024 * 1024 * 10) { v => - sortExecBenchmark(v) - } +// runBenchmarkWithTable("Subquery", 1024 * 1024 * 10) { v => +// subqueryExecBenchmark(v) +// } +// +// runBenchmarkWithTable("Expand", 1024 * 1024 * 10) { v => +// expandExecBenchmark(v) +// } +// +// runBenchmarkWithTable("Project + Filter", 1024 * 1024 * 10) { v => +// for (fractionOfZeros <- List(0.0, 0.50, 0.95)) { +// numericFilterExecBenchmark(v, fractionOfZeros) +// } +// } +// +// runBenchmarkWithTable("Sort", 1024 * 1024 * 10) { v => +// sortExecBenchmark(v) +// } runBenchmarkWithTable("BloomFilterAggregate", 1024 * 1024 * 10) { v => for (card <- List(100, 1024, 1024 * 1024)) { From d41a9d24c5e411bf5517720a363d06a3003613a8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 18 Oct 2024 10:51:22 -0400 Subject: [PATCH 22/23] Only generate native BloomFilterAgg if child has LongType. --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index bb59d8a7e..3ed5d66e2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -769,8 +769,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val numBitsExpr = exprToProto(numBits, inputs, binding) val dataType = serializeDataType(bloom_filter.dataType) - if (childExpr.isDefined && numItemsExpr.isDefined && - numBitsExpr.isDefined && dataType.isDefined) { + if (childExpr.isDefined && + child.dataType + .isInstanceOf[LongType] && // Spark 3.4 only supports Long, 3.5+ adds more types. + numItemsExpr.isDefined && + numBitsExpr.isDefined && + dataType.isDefined) { val bloomFilterAggBuilder = ExprOuterClass.BloomFilterAgg.newBuilder() bloomFilterAggBuilder.setChild(childExpr.get) bloomFilterAggBuilder.setNumItems(numItemsExpr.get) From 6d13890e7c0eb855eb227079da81469ac91b2d8f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 18 Oct 2024 11:03:33 -0400 Subject: [PATCH 23/23] Add TODO with GitHub issue link. --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3ed5d66e2..3805d418b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -769,9 +769,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val numBitsExpr = exprToProto(numBits, inputs, binding) val dataType = serializeDataType(bloom_filter.dataType) + // TODO: Support more types + // https://github.com/apache/datafusion-comet/issues/1023 if (childExpr.isDefined && child.dataType - .isInstanceOf[LongType] && // Spark 3.4 only supports Long, 3.5+ adds more types. + .isInstanceOf[LongType] && numItemsExpr.isDefined && numBitsExpr.isDefined && dataType.isDefined) {