From aadda4b561ace638fb88147a93b5e15db3527d5a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 29 Dec 2020 23:26:27 +0900 Subject: [PATCH] [SPARK-33930][SQL] Script Transform default FIELD DELIMIT should be \u0001 for no serde ### What changes were proposed in this pull request? For same SQL ``` SELECT TRANSFORM(a, b, c, null) ROW FORMAT DELIMITED USING 'cat' ROW FORMAT DELIMITED FIELDS TERMINATED BY '&' FROM (select 1 as a, 2 as b, 3 as c) t ``` In hive: ``` hive> SELECT TRANSFORM(a, b, c, null) > ROW FORMAT DELIMITED > USING 'cat' > ROW FORMAT DELIMITED > FIELDS TERMINATED BY '&' > FROM (select 1 as a, 2 as b, 3 as c) t; OK 123\N NULL Time taken: 14.519 seconds, Fetched: 1 row(s) hive> packet_write_wait: Connection to 10.191.58.100 port 32200: Broken pipe ``` In Spark ``` Spark master: local[*], Application Id: local-1609225830376 spark-sql> SELECT TRANSFORM(a, b, c, null) > ROW FORMAT DELIMITED > USING 'cat' > ROW FORMAT DELIMITED > FIELDS TERMINATED BY '&' > FROM (select 1 as a, 2 as b, 3 as c) t; 1 2 3 null NULL Time taken: 4.297 seconds, Fetched 1 row(s) spark-sql> ``` We should keep same. Change default ROW FORMAT FIELD DELIMIT to `\u0001` In hive default value is '1' to char is '\u0001' ``` bucket_count -1 column.name.delimiter , columns columns.comments columns.types file.inputformat org.apache.hadoop.hive.ql.io.NullRowsInputFormat ``` ### Why are the changes needed? Keep same behavior with hive ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #30958 from AngersZhuuuu/SPARK-33930. Authored-by: angerszhu Signed-off-by: HyukjinKwon --- docs/sql-migration-guide.md | 2 ++ .../BaseScriptTransformationExec.scala | 2 +- .../BaseScriptTransformationSuite.scala | 32 ++++++++++++++++++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index cbb1de53c8896..bd54554baa09d 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -30,6 +30,8 @@ license: | - In Spark 3.2, `ALTER TABLE .. RENAME TO PARTITION` throws `PartitionAlreadyExistsException` instead of `AnalysisException` for tables from Hive external when the target partition already exists. + - In Spark 3.2, script transform default FIELD DELIMIT is `\u0001` for no serde mode. In Spark 3.1 or earlier, the default FIELD DELIMIT is `\t`. + ## Upgrading from Spark SQL 3.0 to 3.1 - In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 74e5aa716ad67..1c87c48ae7cb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -335,7 +335,7 @@ case class ScriptTransformationIOSchema( object ScriptTransformationIOSchema { val defaultFormat = Map( - ("TOK_TABLEROWFORMATFIELD", "\t"), + ("TOK_TABLEROWFORMATFIELD", "\u0001"), ("TOK_TABLEROWFORMATLINES", "\n") ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index 863657a7862a6..cf9ee1ef6db72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.exceptions.TestFailedException import org.apache.spark.{SparkException, TaskContext, TestUtils} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -123,7 +124,11 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU s""" |SELECT |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING 'python $scriptFilePath' AS (a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' |FROM v """.stripMargin) @@ -440,6 +445,31 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU } } } + + test("SPARK-33930: Script Transform default FIELD DELIMIT should be \u0001 (no serde)") { + withTempView("v") { + val df = Seq( + (1, 2, 3), + (2, 3, 4), + (3, 4, 5) + ).toDF("a", "b", "c") + df.createTempView("v") + + checkAnswer( + sql( + s""" + |SELECT TRANSFORM(a, b, c) + | ROW FORMAT DELIMITED + | USING 'cat' AS (a) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '&' + |FROM v + """.stripMargin), identity, + Row("1\u00012\u00013") :: + Row("2\u00013\u00014") :: + Row("3\u00014\u00015") :: Nil) + } + } } case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {