Skip to content

Commit

Permalink
fix: address post merge comet-parquet-exec review comments (#1327)
Browse files Browse the repository at this point in the history
- remove CometArrowUtils
- add (ignored) v2 tests for get_struct_field

## Which issue does this PR close?

Addresses review comments from: #1318
  • Loading branch information
parthchandra authored Jan 23, 2025
1 parent 924efe9 commit a53f0b5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import org.apache.spark.TaskContext$;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.comet.CometArrowUtils;
import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
import org.apache.spark.sql.comet.util.Utils$;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.execution.metric.SQLMetric;
Expand Down Expand Up @@ -260,7 +260,7 @@ public void init() throws URISyntaxException, IOException {
} ////// End get requested schema

String timeZoneId = conf.get("spark.sql.session.timeZone");
Schema arrowSchema = CometArrowUtils.toArrowSchema(sparkSchema, timeZoneId);
Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, timeZoneId);
ByteArrayOutputStream out = new ByteArrayOutputStream();
WriteChannel writeChannel = new WriteChannel(Channels.newChannel(out));
MessageSerializer.serialize(writeChannel, arrowSchema);
Expand Down
180 changes: 0 additions & 180 deletions common/src/main/scala/org/apache/spark/sql/comet/CometArrowUtils.scala

This file was deleted.

57 changes: 33 additions & 24 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2307,7 +2307,22 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("get_struct_field with DataFusion ParquetExec - simple case") {
private def testV1AndV2(testName: String)(f: => Unit): Unit = {
test(s"$testName - V1") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { f }
}

// The test will fail because it will produce a different plan and the operator check will fail
// We could get the test to pass anyway by skipping the operator check, but when V2 does get supported,
// we want to make sure we enable the operator check and marking the test as ignore will make it
// more obvious
//
ignore(s"$testName - V2") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { f }
}
}

testV1AndV2("get_struct_field with DataFusion ParquetExec - simple case") {
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
Expand All @@ -2320,21 +2335,18 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
df.write.parquet(dir.toString())
}

Seq("parquet").foreach { v1List =>
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {

val df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("nested1.id"))
}
val df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("nested1.id"))
}
}
}

test("get_struct_field with DataFusion ParquetExec - select subset of struct") {
testV1AndV2("get_struct_field with DataFusion ParquetExec - select subset of struct") {
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
Expand All @@ -2353,22 +2365,19 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
df.write.parquet(dir.toString())
}

Seq("parquet").foreach { v1List =>
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {

val df = spark.read.parquet(dir.toString())
val df = spark.read.parquet(dir.toString())

checkSparkAnswerAndOperator(df.select("nested1.id"))
checkSparkAnswerAndOperator(df.select("nested1.id"))

checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id"))
checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id"))

// unsupported cast from Int64 to Struct([Field { name: "id", data_type: Int64, ...
// checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
}
// unsupported cast from Int64 to Struct([Field { name: "id", data_type: Int64, ...
// checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
}
}
}
Expand All @@ -2393,7 +2402,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
df.write.parquet(dir.toString())
}

Seq("parquet").foreach { v1List =>
Seq("", "parquet").foreach { v1List =>
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_ENABLED.key -> "true",
Expand Down

0 comments on commit a53f0b5

Please sign in to comment.