From 67ab9684f236cccc68802f425b34d5f97795f82c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 27 Jul 2024 10:55:01 -0600 Subject: [PATCH 01/15] add some documentation --- .../scala/org/apache/comet/CometConf.scala | 7 + docs/source/user-guide/configs.md | 1 + .../datafusion/expressions/strings.rs | 2 - .../core/src/execution/datafusion/planner.rs | 13 +- native/proto/src/proto/expr.proto | 10 +- native/spark-expr/src/lib.rs | 2 + native/spark-expr/src/regexp.rs | 147 ++++++++++++++++++ .../apache/comet/serde/QueryPlanSerde.scala | 57 ++++--- .../apache/comet/CometExpressionSuite.scala | 59 +++++++ 9 files changed, 272 insertions(+), 26 deletions(-) create mode 100644 native/spark-expr/src/regexp.rs diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 5c3ebf6fb..07972cc6a 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -410,6 +410,13 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = + conf("spark.comet.regexp.allowIncompatible") + .doc("Comet is not currently fully compatible with Spark for all regular expressions. " + + "Set this config to true to allow them anyway using Rust's regular expression engine. " + + "See compatibility guide for more information.") + .booleanConf + .createWithDefault(false) } object ConfigHelpers { diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 8a0f2440a..7c449fa82 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -44,6 +44,7 @@ Comet provides the following configuration settings. | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false | +| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false | | spark.comet.rowToColumnar.supportedOperatorList | A comma-separated list of row-based operators that will be converted to columnar format when 'spark.comet.rowToColumnar.enabled' is true | Range,InMemoryTableScan | | spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true | | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | diff --git a/native/core/src/execution/datafusion/expressions/strings.rs b/native/core/src/execution/datafusion/expressions/strings.rs index f7f5b02e8..9112c256b 100644 --- a/native/core/src/execution/datafusion/expressions/strings.rs +++ b/native/core/src/execution/datafusion/expressions/strings.rs @@ -143,8 +143,6 @@ make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn); make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn); -// make_predicate_function!(RLike, rlike_dyn, rlike_utf8_scalar_dyn); - #[derive(Debug, Hash)] pub struct SubstringExpr { pub child: Arc, diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index f3bc146d3..c2165455c 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -59,6 +59,7 @@ use datafusion_expr::expr::find_df_window_func; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr_common::aggregate::create_aggregate_expr; +use datafusion_physical_expr_common::expressions::Literal; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; @@ -108,7 +109,7 @@ use datafusion_comet_proto::{ spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; use datafusion_comet_spark_expr::{ - Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr, + Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, }; // For clippy error on type_complexity. @@ -447,6 +448,16 @@ impl PhysicalPlanner { Ok(Arc::new(Like::new(left, right))) } + ExprStruct::Rlike(expr) => { + let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + match right.as_any().downcast_ref::().unwrap().value() { + ScalarValue::Utf8(Some(pattern)) => { + Ok(Arc::new(RLike::try_new(left, &pattern)?)) + } + _ => todo!(), + } + } ExprStruct::CheckOverflow(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 56518d9ee..158356dfa 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -54,7 +54,7 @@ message Expr { StartsWith startsWith = 27; EndsWith endsWith = 28; Contains contains = 29; - // RLike rlike = 30; + RLike rlike = 30; ScalarFunc scalarFunc = 31; EqualNullSafe eqNullSafe = 32; NotEqualNullSafe neqNullSafe = 33; @@ -368,10 +368,10 @@ message Like { Expr right = 2; } -// message RLike { -// Expr left = 1; -// Expr right = 2; -// } +message RLike { + Expr left = 1; + Expr right = 2; +} message StartsWith { Expr left = 1; diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 22628978d..d83d6496d 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -20,6 +20,7 @@ mod error; mod if_expr; mod kernels; +mod regexp; mod temporal; pub mod timezone; pub mod utils; @@ -27,6 +28,7 @@ pub mod utils; pub use cast::{spark_cast, Cast}; pub use error::{SparkError, SparkResult}; pub use if_expr::IfExpr; +pub use regexp::RLike; pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr}; /// Spark supports three evaluation modes when evaluating expressions, which affect diff --git a/native/spark-expr/src/regexp.rs b/native/spark-expr/src/regexp.rs new file mode 100644 index 000000000..254dc03f3 --- /dev/null +++ b/native/spark-expr/src/regexp.rs @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::down_cast_any_ref; +use crate::SparkError; +use arrow_array::builder::BooleanBuilder; +use arrow_array::{Array, RecordBatch, StringArray}; +use arrow_schema::{DataType, Schema}; +use datafusion_common::{internal_err, Result}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use regex::Regex; +use std::any::Any; +use std::fmt::{Display, Formatter}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +/// Implementation of RLIKE operator. +/// +/// Note that this implementation is not yet Spark-compatible and simply delegates to +/// the Rust regexp crate. It will match Spark behavior for some simple cases but has +/// differences in whitespace handling and does not support all the features of Java's +/// regular expression engine, which are documented at: +/// +/// https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html +#[derive(Debug)] +pub struct RLike { + child: Arc, + // Only scalar patterns are supported + pattern_str: String, + pattern: Regex, +} + +impl Hash for RLike { + fn hash(&self, state: &mut H) { + state.write(self.pattern_str.as_bytes()); + } +} + +impl RLike { + pub fn try_new(child: Arc, pattern: &str) -> Result { + Ok(Self { + child, + pattern_str: pattern.to_string(), + pattern: Regex::new(pattern).map_err(|e| { + SparkError::Internal(format!("Failed to compile pattern {}: {}", pattern, e)) + })?, + }) + } +} + +impl Display for RLike { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RLike [child: {}, pattern: {}] ", + self.child, self.pattern_str + ) + } +} + +impl PartialEq for RLike { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| self.child.eq(&x.child) && self.pattern_str.eq(&x.pattern_str)) + .unwrap_or(false) + } +} + +impl PhysicalExpr for RLike { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Boolean) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.child.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + match self.child.evaluate(batch)? { + ColumnarValue::Array(array) => { + let inputs = array + .as_any() + .downcast_ref::() + .expect("string array"); + let mut builder = BooleanBuilder::with_capacity(inputs.len()); + if inputs.is_nullable() { + for i in 0..inputs.len() { + if inputs.is_null(i) { + builder.append_null(); + } else { + builder.append_value(self.pattern.is_match(inputs.value(i))); + } + } + } else { + for i in 0..inputs.len() { + builder.append_value(self.pattern.is_match(inputs.value(i))); + } + } + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } + ColumnarValue::Scalar(_) => { + internal_err!("non scalar regexp patterns are not supported") + } + } + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert!(children.len() == 1); + Ok(Arc::new(RLike::try_new( + children[0].clone(), + &self.pattern_str, + )?)) + } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + use std::hash::Hash; + let mut s = state; + self.hash(&mut s); + } +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e06405a1a..25dc79234 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1235,25 +1235,46 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - // TODO waiting for arrow-rs update -// case RLike(left, right) => -// val leftExpr = exprToProtoInternal(left, inputs) -// val rightExpr = exprToProtoInternal(right, inputs) -// -// if (leftExpr.isDefined && rightExpr.isDefined) { -// val builder = ExprOuterClass.RLike.newBuilder() -// builder.setLeft(leftExpr.get) -// builder.setRight(rightExpr.get) -// -// Some( -// ExprOuterClass.Expr -// .newBuilder() -// .setRlike(builder) -// .build()) -// } else { -// None -// } + case RLike(left, right) => + // for now, we assume that all regular expressions are incompatible with Spark but + // later we can add logic to determine if a pattern will produce the same results + // in Rust, or even transpile the pattern to work around differences between the JVM + // and Rust regular expression engines + if (CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { + + // we currently only support scalar regex patterns + right match { + case Literal(_, DataTypes.StringType) => + // supported + case _ => + withInfo(expr, "Only scalar regexp patterns are supported") + return None + } + + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) + if (leftExpr.isDefined && rightExpr.isDefined) { + val builder = ExprOuterClass.RLike.newBuilder() + builder.setLeft(leftExpr.get) + builder.setRight(rightExpr.get) + + Some( + ExprOuterClass.Expr + .newBuilder() + .setRlike(builder) + .build()) + } else { + withInfo(expr, left, right) + None + } + } else { + withInfo( + expr, + "Regular expressions are disabled. " + + s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true to enable them.") + None + } case StartsWith(left, right) => val leftExpr = exprToProtoInternal(left, inputs) val rightExpr = exprToProtoInternal(right, inputs) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index c22c6b06a..cdc27d5ba 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -20,9 +20,11 @@ package org.apache.comet import java.time.{Duration, Period} +import java.util.regex.Pattern import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import scala.util.{Random, Try} import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} @@ -617,6 +619,63 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("rlike") { + val table = "rlike_names" + val gen = new DataGenerator(new Random(42)) + Seq(false, true).foreach { withDictionary => + val data = Seq("James Smith", "Michael Rose", "Rames Rose", "Rames rose") ++ + gen.generateStrings(100, "rames Rose", 12) + withParquetFile(data.zipWithIndex, withDictionary) { file => + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + spark.read.parquet(file).createOrReplaceTempView(table) + val query = sql(s"select _2 as id, _1 rlike 'R[a-z]+s [Rr]ose' from $table") + checkSparkAnswerAndOperator(query) + } + } + } + } + + test("rlike fallback for non scalar pattern") { + val table = "rlike_fallback" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql(s"insert into $table values(1,'James Smith')") + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + val query2 = sql(s"select id from $table where name rlike name") + val (_, cometPlan) = checkSparkAnswer(query2) + val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) + assert(explain.contains("Only scalar regexp patterns are supported")) + } + } + } + + // this test demonstrates that Comet is not currently compatible + // with Spark for regular expressions + ignore("rlike fuzz test") { + val table = "rlike_fuzz" + val gen = new DataGenerator(new Random(42)) + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + + val dataChars = "[]$^-=*09azAZ$\r\n\t abc123" + gen.generateStrings(1000, dataChars, 6).zipWithIndex.foreach { x => + sql(s"insert into $table values(${x._2}, '${x._1}')") + } + + val patternChars = "[]$^-=*09azAZ$\r\n\t " + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + val validPatterns = gen + .generateStrings(1000, patternChars, 6) + .filter(pattern => Try(Pattern.compile(pattern)).isSuccess) + assert(validPatterns.nonEmpty) + validPatterns.foreach { pattern => + val query = sql(s"select id, name, name rlike '$pattern' from $table") + checkSparkAnswerAndOperator(query) + } + } + } + } + test("contains") { val table = "names" withTable(table) { From 3fec3e4063ed7d5e15c9032e368343cfc14ac8ad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 28 Jul 2024 08:54:10 -0600 Subject: [PATCH 02/15] add another test --- .../apache/comet/CometExpressionSuite.scala | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index cdc27d5ba..fa087fe06 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -649,9 +649,35 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("rlike fuzz test") { + val table = "rlike_fuzz" + val gen = new DataGenerator(new Random(42)) + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + + val dataChars = "[]$^-=*09azAZ$\t abc123" + gen.generateStrings(100, dataChars, 6).zipWithIndex.foreach { x => + sql(s"insert into $table values(${x._2}, '${x._1}')") + } + + val patternChars = "[]$^-=*09azAZ$\t " + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + val validPatterns = gen + .generateStrings(100, patternChars, 6) + .filter(pattern => Try(Pattern.compile(pattern)).isSuccess) + assert(validPatterns.nonEmpty) + validPatterns.foreach { pattern => + val query = sql(s"select id, name, name rlike '$pattern' from $table") + checkSparkAnswerAndOperator(query) + } + } + } + } + // this test demonstrates that Comet is not currently compatible - // with Spark for regular expressions - ignore("rlike fuzz test") { + // with Spark for regular expressions, especially when the data + // contains newline characters + ignore("rlike fuzz test failing cases") { val table = "rlike_fuzz" val gen = new DataGenerator(new Random(42)) withTable(table) { From 51c2f0d110bd61d5c9ebf61ed90262d7a58e254d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 28 Jul 2024 09:32:42 -0600 Subject: [PATCH 03/15] prepare for review --- .../org/apache/comet/expressions/RegExp.scala | 32 ++++++ .../apache/comet/serde/QueryPlanSerde.scala | 60 +++++------ .../apache/comet/CometExpressionSuite.scala | 99 +++++++++++-------- 3 files changed, 115 insertions(+), 76 deletions(-) create mode 100644 spark/src/main/scala/org/apache/comet/expressions/RegExp.scala diff --git a/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala new file mode 100644 index 000000000..8dc0c828c --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.expressions + +object RegExp { + + /** Determine whether the regexp pattern is supported natively and compatible with Spark */ + def isSupportedPattern(pattern: String): Boolean = { + // this is a placeholder for implementing logic to determine if the pattern + // is known to be compatible with Spark, so that we can enable regexp automatically + // for common cases and fallback to Spark for more complex cases + false + } + +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 25dc79234..ccf881b73 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -44,7 +44,7 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark34Plus, withInfo} -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, Unsupported} +import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, RegExp, Unsupported} import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, BuildSide, JoinType, Operator} @@ -1236,43 +1236,37 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case RLike(left, right) => - // for now, we assume that all regular expressions are incompatible with Spark but - // later we can add logic to determine if a pattern will produce the same results - // in Rust, or even transpile the pattern to work around differences between the JVM - // and Rust regular expression engines - if (CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { - - // we currently only support scalar regex patterns - right match { - case Literal(_, DataTypes.StringType) => - // supported - case _ => - withInfo(expr, "Only scalar regexp patterns are supported") + // we currently only support scalar regex patterns + right match { + case Literal(pattern, DataTypes.StringType) => + if (!RegExp.isSupportedPattern(pattern.toString) && + !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { + withInfo( + expr, + s"Regexp pattern $pattern is not compatible with Spark. " + + s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true to allow it anyway.") return None - } + } + case _ => + withInfo(expr, "Only scalar regexp patterns are supported") + return None + } - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) - if (leftExpr.isDefined && rightExpr.isDefined) { - val builder = ExprOuterClass.RLike.newBuilder() - builder.setLeft(leftExpr.get) - builder.setRight(rightExpr.get) + if (leftExpr.isDefined && rightExpr.isDefined) { + val builder = ExprOuterClass.RLike.newBuilder() + builder.setLeft(leftExpr.get) + builder.setRight(rightExpr.get) - Some( - ExprOuterClass.Expr - .newBuilder() - .setRlike(builder) - .build()) - } else { - withInfo(expr, left, right) - None - } + Some( + ExprOuterClass.Expr + .newBuilder() + .setRlike(builder) + .build()) } else { - withInfo( - expr, - "Regular expressions are disabled. " + - s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true to enable them.") + withInfo(expr, left, right) None } case StartsWith(left, right) => diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index fa087fe06..3e0c2efdc 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -20,11 +20,10 @@ package org.apache.comet import java.time.{Duration, Period} -import java.util.regex.Pattern import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag -import scala.util.{Random, Try} +import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} @@ -619,7 +618,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("rlike") { + test("rlike simple case") { val table = "rlike_names" val gen = new DataGenerator(new Random(42)) Seq(false, true).foreach { withDictionary => @@ -649,54 +648,68 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("rlike fuzz test") { - val table = "rlike_fuzz" - val gen = new DataGenerator(new Random(42)) - withTable(table) { - sql(s"create table $table(id int, name varchar(20)) using parquet") - - val dataChars = "[]$^-=*09azAZ$\t abc123" - gen.generateStrings(100, dataChars, 6).zipWithIndex.foreach { x => - sql(s"insert into $table values(${x._2}, '${x._1}')") - } - - val patternChars = "[]$^-=*09azAZ$\t " - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { - val validPatterns = gen - .generateStrings(100, patternChars, 6) - .filter(pattern => Try(Pattern.compile(pattern)).isSuccess) - assert(validPatterns.nonEmpty) - validPatterns.foreach { pattern => - val query = sql(s"select id, name, name rlike '$pattern' from $table") - checkSparkAnswerAndOperator(query) - } - } - } - } - - // this test demonstrates that Comet is not currently compatible - // with Spark for regular expressions, especially when the data - // contains newline characters - ignore("rlike fuzz test failing cases") { + test("rlike") { val table = "rlike_fuzz" val gen = new DataGenerator(new Random(42)) withTable(table) { + // generate some data + // newline characters are intentionally omitted for now + val dataChars = "\t abc123" sql(s"create table $table(id int, name varchar(20)) using parquet") - - val dataChars = "[]$^-=*09azAZ$\r\n\t abc123" gen.generateStrings(1000, dataChars, 6).zipWithIndex.foreach { x => sql(s"insert into $table values(${x._2}, '${x._1}')") } - val patternChars = "[]$^-=*09azAZ$\r\n\t " - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { - val validPatterns = gen - .generateStrings(1000, patternChars, 6) - .filter(pattern => Try(Pattern.compile(pattern)).isSuccess) - assert(validPatterns.nonEmpty) - validPatterns.foreach { pattern => - val query = sql(s"select id, name, name rlike '$pattern' from $table") - checkSparkAnswerAndOperator(query) + // test some common cases - this is far from comprehensive + // see https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html + // for all valid patterns in Java's regexp engine + // + // patterns not currently covered: + // - octal values + // - hex values + // - specific character matches + // - specific whitespace/newline matches + // - complex character classes (union, intersection, subtraction) + // - POSIX character classes + // - java.lang.Character classes + // - Classes for Unicode scripts, blocks, categories and binary properties + // - reluctant quantifiers + // - possessive quantifiers + // - logical operators + // - back-references + // - quotations + // - special constructs (name capturing and non-capturing) + val startPatterns = Seq("", "^", "\\A") + val endPatterns = Seq("", "$", "\\Z", "\\z") + val patternParts = Seq( + "[0-9]", + "[a-z]", + "[^a-z]", + "\\d", + "\\D", + "\\w", + "\\W", + "\\b", + "\\B", + "\\h", + "\\H", + "\\s", + "\\S", + "\\v", + "\\V") + val qualifiers = Seq("", "+", "*", "?", "{1,}") + + for (start <- startPatterns) { + for (end <- endPatterns) { + for (part <- patternParts) { + for (qualifier <- qualifiers) { + val pattern = start + part + qualifier + end + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + val query = sql(s"select id, name, name rlike '$pattern' from $table") + checkSparkAnswerAndOperator(query) + } + } + } } } } From 7f470922758e9e656bc55440cb6e5b47afbde184 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 28 Jul 2024 09:36:39 -0600 Subject: [PATCH 04/15] remove todo --- native/core/src/execution/datafusion/planner.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index c2165455c..f277794d3 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -455,7 +455,9 @@ impl PhysicalPlanner { ScalarValue::Utf8(Some(pattern)) => { Ok(Arc::new(RLike::try_new(left, &pattern)?)) } - _ => todo!(), + _ => Err(ExecutionError::GeneralError( + "RLike only supports scalar patterns".to_string(), + )), } } ExprStruct::CheckOverflow(expr) => { From a871beac97fbf59f40a7c6b3ffb7473d372bf156 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 28 Jul 2024 09:39:32 -0600 Subject: [PATCH 05/15] docs --- docs/source/user-guide/compatibility.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index a16fd1b21..574f56d45 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -32,6 +32,12 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. +## Regular Expressions + +Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's +regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but +this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. + ## Cast Cast operations in Comet fall into three levels of support: From 10dea84a4711a34fe134a006e91104bc030ff352 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 28 Jul 2024 09:45:40 -0600 Subject: [PATCH 06/15] clippy --- native/core/src/execution/datafusion/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 7462f742f..02ebe2755 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -453,7 +453,7 @@ impl PhysicalPlanner { let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; match right.as_any().downcast_ref::().unwrap().value() { ScalarValue::Utf8(Some(pattern)) => { - Ok(Arc::new(RLike::try_new(left, &pattern)?)) + Ok(Arc::new(RLike::try_new(left, pattern)?)) } _ => Err(ExecutionError::GeneralError( "RLike only supports scalar patterns".to_string(), From eb462bb1a17e9538266dbf3709f574133ccb9353 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 28 Jul 2024 10:42:23 -0600 Subject: [PATCH 07/15] formatting --- .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index ccf881b73..85ae56ee6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1244,7 +1244,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo( expr, s"Regexp pattern $pattern is not compatible with Spark. " + - s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true to allow it anyway.") + s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " + + "to allow it anyway.") return None } case _ => From e966b0782a038ad8a48ec045c06d82c62f530d74 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 29 Jul 2024 09:03:39 -0600 Subject: [PATCH 08/15] test a subset of patterns --- .../apache/comet/CometExpressionSuite.scala | 23 +++++++++---------- .../org/apache/comet/DataGenerator.scala | 5 ++++ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 3e0c2efdc..64c34cf57 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -656,7 +656,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // newline characters are intentionally omitted for now val dataChars = "\t abc123" sql(s"create table $table(id int, name varchar(20)) using parquet") - gen.generateStrings(1000, dataChars, 6).zipWithIndex.foreach { x => + gen.generateStrings(100, dataChars, 6).zipWithIndex.foreach { x => sql(s"insert into $table values(${x._2}, '${x._1}')") } @@ -699,17 +699,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { "\\V") val qualifiers = Seq("", "+", "*", "?", "{1,}") - for (start <- startPatterns) { - for (end <- endPatterns) { - for (part <- patternParts) { - for (qualifier <- qualifiers) { - val pattern = start + part + qualifier + end - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { - val query = sql(s"select id, name, name rlike '$pattern' from $table") - checkSparkAnswerAndOperator(query) - } - } - } + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + // testing every possible combination takes too long, so we pick some + // random combinations + for (_ <- 0 until 100) { + val pattern = gen.pickRandom(startPatterns) + + gen.pickRandom(patternParts) + + gen.pickRandom(qualifiers) + + gen.pickRandom(endPatterns) + val query = sql(s"select id, name, name rlike '$pattern' from $table") + checkSparkAnswerAndOperator(query) } } } diff --git a/spark/src/test/scala/org/apache/comet/DataGenerator.scala b/spark/src/test/scala/org/apache/comet/DataGenerator.scala index 80e7c2288..443a058bc 100644 --- a/spark/src/test/scala/org/apache/comet/DataGenerator.scala +++ b/spark/src/test/scala/org/apache/comet/DataGenerator.scala @@ -36,6 +36,11 @@ object DataGenerator { class DataGenerator(r: Random) { import DataGenerator._ + /** Pick a random item from a sequence */ + def pickRandom[T](items: Seq[T]): T = { + items(r.nextInt(items.length)) + } + /** Generate a random string using the specified characters */ def generateString(chars: String, maxLen: Int): String = { val len = r.nextInt(maxLen) From e5599528824b1fc435740ef7b351aafee9e775e2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 29 Jul 2024 11:31:34 -0600 Subject: [PATCH 09/15] Add another test --- .../org/apache/comet/expressions/RegExp.scala | 7 +++ .../apache/comet/CometExpressionSuite.scala | 53 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala index 8dc0c828c..1afacad27 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala @@ -29,4 +29,11 @@ object RegExp { false } + def escape(pattern: String): String = pattern.map { + case '\t' => "\\t" + case '\r' => "\\r" + case '\n' => "\\n" + case other => other + }.mkString + } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 64c34cf57..3a6605387 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types.{Decimal, DecimalType} import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus} +import org.apache.comet.expressions.RegExp class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -648,6 +649,58 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("rlike whitespace") { + val table = "rlike_whitespace" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + val values = + Seq("James Smith", "\rJames\rSmith\r", "\nJames\nSmith\n", "\r\nJames\r\nSmith\r\n") + values.zipWithIndex.foreach { x => + sql(s"insert into $table values (${x._2}, '${x._1}')") + } + val patterns = Seq( + "James", + "J[a-z]mes", + "^James", + "\\AJames", + "Smith", + "James$", + "James\\Z", + "James\\z", + "^Smith", + "\\ASmith", + // $ produces different results - we could potentially transpile this to a different + // expression or just fall back to Spark for this case + // "Smith$", + "Smith\\Z", + "Smith\\z") + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + patterns.foreach { pattern => + val query2 = sql(s"select name, '$pattern', name rlike '$pattern' from $table") + checkSparkAnswerAndOperator(query2) + } + } + } + } + + protected def checkRegexpAnswer(df: => DataFrame): Unit = { + var expected: Array[Row] = Array.empty + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + expected = df.collect() + } + val actual = df.collect() + assert(actual.length == expected.length) + actual.zip(expected).foreach { case (a, b) => + val l = RegExp.escape(a.mkString(",")) + val r = RegExp.escape(b.mkString(",")) + // scalastyle:off println + println(l + " === " + r) + // scalastyle:on println + assert(l === r) + } + + } + test("rlike") { val table = "rlike_fuzz" val gen = new DataGenerator(new Random(42)) From f97543266e3c325eea25cc5cf1835388c19995fd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 29 Jul 2024 11:32:33 -0600 Subject: [PATCH 10/15] remove unused methods --- .../org/apache/comet/expressions/RegExp.scala | 7 ------- .../apache/comet/CometExpressionSuite.scala | 18 ------------------ 2 files changed, 25 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala index 1afacad27..8dc0c828c 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala @@ -29,11 +29,4 @@ object RegExp { false } - def escape(pattern: String): String = pattern.map { - case '\t' => "\\t" - case '\r' => "\\r" - case '\n' => "\\n" - case other => other - }.mkString - } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 3a6605387..a178f7def 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -683,24 +683,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - protected def checkRegexpAnswer(df: => DataFrame): Unit = { - var expected: Array[Row] = Array.empty - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - expected = df.collect() - } - val actual = df.collect() - assert(actual.length == expected.length) - actual.zip(expected).foreach { case (a, b) => - val l = RegExp.escape(a.mkString(",")) - val r = RegExp.escape(b.mkString(",")) - // scalastyle:off println - println(l + " === " + r) - // scalastyle:on println - assert(l === r) - } - - } - test("rlike") { val table = "rlike_fuzz" val gen = new DataGenerator(new Random(42)) From 3a21ca5c4cba7a9335aa81ba140ff8f749901dee Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 29 Jul 2024 15:15:07 -0600 Subject: [PATCH 11/15] remove unused import --- spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index a178f7def..4a986b8c9 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types.{Decimal, DecimalType} import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus} -import org.apache.comet.expressions.RegExp class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ From 2bd94955fd2bfcad6fc5f5ef5058465faa28cbcb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 31 Jul 2024 17:02:30 -0600 Subject: [PATCH 12/15] add dictionary support --- native/spark-expr/src/regexp.rs | 55 +++++++++++++------ .../CometAggregateBenchmark-jdk11-results.txt | 24 ++++++++ .../apache/comet/CometExpressionSuite.scala | 4 +- 3 files changed, 65 insertions(+), 18 deletions(-) create mode 100644 spark/benchmarks/CometAggregateBenchmark-jdk11-results.txt diff --git a/native/spark-expr/src/regexp.rs b/native/spark-expr/src/regexp.rs index 254dc03f3..2672d754f 100644 --- a/native/spark-expr/src/regexp.rs +++ b/native/spark-expr/src/regexp.rs @@ -17,8 +17,10 @@ use crate::utils::down_cast_any_ref; use crate::SparkError; +use arrow::compute::take; use arrow_array::builder::BooleanBuilder; -use arrow_array::{Array, RecordBatch, StringArray}; +use arrow_array::types::Int32Type; +use arrow_array::{Array, BooleanArray, DictionaryArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Schema}; use datafusion_common::{internal_err, Result}; use datafusion_expr::ColumnarValue; @@ -61,6 +63,24 @@ impl RLike { })?, }) } + + fn is_match(&self, inputs: &StringArray) -> BooleanArray { + let mut builder = BooleanBuilder::with_capacity(inputs.len()); + if inputs.is_nullable() { + for i in 0..inputs.len() { + if inputs.is_null(i) { + builder.append_null(); + } else { + builder.append_value(self.pattern.is_match(inputs.value(i))); + } + } + } else { + for i in 0..inputs.len() { + builder.append_value(self.pattern.is_match(inputs.value(i))); + } + } + builder.finish() + } } impl Display for RLike { @@ -97,26 +117,29 @@ impl PhysicalExpr for RLike { fn evaluate(&self, batch: &RecordBatch) -> Result { match self.child.evaluate(batch)? { + ColumnarValue::Array(array) if array.as_any().is::>() => { + let dict_array = array + .as_any() + .downcast_ref::>() + .expect("dict array"); + let dict_values = dict_array + .values() + .as_any() + .downcast_ref::() + .expect("strings"); + // evaluate the regexp pattern against the dictionary values + let new_values = self.is_match(dict_values); + // convert to conventional (not dictionary-encoded) array + let result = take(&new_values, dict_array.keys(), None)?; + Ok(ColumnarValue::Array(result)) + } ColumnarValue::Array(array) => { let inputs = array .as_any() .downcast_ref::() .expect("string array"); - let mut builder = BooleanBuilder::with_capacity(inputs.len()); - if inputs.is_nullable() { - for i in 0..inputs.len() { - if inputs.is_null(i) { - builder.append_null(); - } else { - builder.append_value(self.pattern.is_match(inputs.value(i))); - } - } - } else { - for i in 0..inputs.len() { - builder.append_value(self.pattern.is_match(inputs.value(i))); - } - } - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + let array = self.is_match(inputs); + Ok(ColumnarValue::Array(Arc::new(array))) } ColumnarValue::Scalar(_) => { internal_err!("non scalar regexp patterns are not supported") diff --git a/spark/benchmarks/CometAggregateBenchmark-jdk11-results.txt b/spark/benchmarks/CometAggregateBenchmark-jdk11-results.txt new file mode 100644 index 000000000..9e3e15bc6 --- /dev/null +++ b/spark/benchmarks/CometAggregateBenchmark-jdk11-results.txt @@ -0,0 +1,24 @@ +================================================================================================ +Grouped Aggregate (single group key + single aggregate SUM) +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-41-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate SUM: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------------ +SQL Parquet - Spark (SUM) 2663 2744 115 3.9 254.0 1.0X +SQL Parquet - Comet (Scan, Exec) (SUM) 1067 1084 24 9.8 101.8 2.5X + + +================================================================================================ +Grouped Aggregate (single group key + single aggregate COUNT) +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-41-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 2532 2552 28 4.1 241.5 1.0X +SQL Parquet - Comet (Scan, Exec) (COUNT) 4590 4592 4 2.3 437.7 0.6X + + diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 4a986b8c9..dd02be4f2 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -620,10 +620,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("rlike simple case") { val table = "rlike_names" - val gen = new DataGenerator(new Random(42)) Seq(false, true).foreach { withDictionary => val data = Seq("James Smith", "Michael Rose", "Rames Rose", "Rames rose") ++ - gen.generateStrings(100, "rames Rose", 12) + // add repetitive data to trigger dictionary encoding + Range(0, 100).map(_ => "John Smith") withParquetFile(data.zipWithIndex, withDictionary) { file => withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { spark.read.parquet(file).createOrReplaceTempView(table) From 8c4ffd187687f0fcbacae379ff117b66551d46ac Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 31 Jul 2024 17:03:43 -0600 Subject: [PATCH 13/15] docs --- docs/templates/compatibility-template.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/templates/compatibility-template.md b/docs/templates/compatibility-template.md index 64f871354..137870a7e 100644 --- a/docs/templates/compatibility-template.md +++ b/docs/templates/compatibility-template.md @@ -32,6 +32,12 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. +## Regular Expressions + +Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's +regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but +this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. + ## Cast Cast operations in Comet fall into three levels of support: From 4e821155953a280c3777aeb337caf33af8690aa9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 1 Aug 2024 08:46:03 -0600 Subject: [PATCH 14/15] add rlike microbenchmark --- docs/source/user-guide/compatibility.md | 2 +- .../tpcds-micro-benchmarks/rlike.sql | 20 +++++++++++++++++++ .../benchmark/CometTPCDSMicroBenchmark.scala | 3 ++- 3 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 spark/src/test/resources/tpcds-micro-benchmarks/rlike.sql diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 574f56d45..0af44eb62 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -34,7 +34,7 @@ There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where ## Regular Expressions -Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's +Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. diff --git a/spark/src/test/resources/tpcds-micro-benchmarks/rlike.sql b/spark/src/test/resources/tpcds-micro-benchmarks/rlike.sql new file mode 100644 index 000000000..710379d6b --- /dev/null +++ b/spark/src/test/resources/tpcds-micro-benchmarks/rlike.sql @@ -0,0 +1,20 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- This is not part of TPC-DS but runs on TPC-DS data + +SELECT i_item_desc, i_item_desc RLIKE '[A-Z][aeiou]+' FROM item; \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index 22d846701..da37de5c9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -69,7 +69,8 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { "join_exploding_output", "join_inner", "join_left_outer", - "join_semi") + "join_semi", + "rlike") override def runQueries( queryLocation: String, From 5dcd8fa19b7fac133c1c982a6f75667d16c1c0aa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 1 Aug 2024 08:56:53 -0600 Subject: [PATCH 15/15] enable rlike --- .../apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index da37de5c9..074d424be 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -111,6 +111,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { benchmark.addCase(s"$name$nameSuffix: Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { cometSpark.sql(queryString).noop()