From e40e94deb847a147e6d8eaa1e7268f5c8686be32 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 4 Aug 2024 15:43:55 -0700 Subject: [PATCH] fix: Fallback to Spark when shuffling on struct with duplicate field name --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 4 +++- .../apache/comet/exec/CometColumnarShuffleSuite.scala | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index ad53c84af..8f08eeba8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2936,7 +2936,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim _: DateType | _: BooleanType => true case StructType(fields) => - fields.forall(f => supportedDataType(f.dataType)) + fields.forall(f => supportedDataType(f.dataType)) && + // Java Arrow stream reader cannot work on duplicate field name + fields.map(f => f.name).distinct.length == fields.length case ArrayType(ArrayType(_, _), _) => false // TODO: nested array is not supported case ArrayType(MapType(_, _, _), _) => false // TODO: map array element is not supported case ArrayType(elementType, _) => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 9ae882d86..78b4bbb91 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -68,6 +68,16 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar setupTestData() + test("Fallback to Spark when shuffling on struct with duplicate field name") { + val df = sql(""" + | SELECT max(struct(a, record.*, b)) as r FROM + | (select a as a, b as b, struct(a,b) as record from testData2) tmp + | GROUP BY a + """.stripMargin).select($"r.*") + + checkSparkAnswer(df) + } + test("Unsupported types for SinglePartition should fallback to Spark") { checkSparkAnswer(spark.sql(""" |SELECT