Skip to content

Commit

Permalink
fix: Sort on single struct should fallback to Spark (apache#811)
Browse files Browse the repository at this point in the history
(cherry picked from commit 071c780)
  • Loading branch information
viirya authored and huaxingao committed Aug 22, 2024
1 parent 394a124 commit 8452ef8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ Comet provides the following configuration settings.
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Comet columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |
| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false |
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Comet columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |
Original file line number Diff line number Diff line change
Expand Up @@ -2501,6 +2501,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

case SortExec(sortOrder, _, child, _)
if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_SORT) =>
// TODO: Remove this constraint when we upgrade to new arrow-rs including
// https://github.com/apache/arrow-rs/pull/6225
if (child.output.length == 1 && child.output.head.dataType.isInstanceOf[StructType]) {
withInfo(op, "Sort on single struct column is not supported")
return None
}

val sortOrders = sortOrder.map(exprToProto(_, child.output))

if (sortOrders.forall(_.isDefined) && childOp.nonEmpty) {
Expand Down
20 changes: 20 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ class CometExecSuite extends CometTestBase {
}
}

test("Sort on single struct should fallback to Spark") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val data =
Seq(Tuple1(null), Tuple1((1, "a")), Tuple1((2, null)), Tuple1((3, "b")), Tuple1(null))

withParquetFile(data) { file =>
readParquetFile(file) { df =>
val sort = df.sort("_1")
checkSparkAnswer(sort)
}
}
}
}

test("Native window operator should be CometUnaryExec") {
withTempView("testData") {
sql("""
Expand Down

0 comments on commit 8452ef8

Please sign in to comment.