Skip to content

Commit aad72ad

Browse files
huaxingaochenzhx
authored andcommitted
[SPARK-37627][SQL] Add sorted column in BucketTransform
### What changes were proposed in this pull request? In V1, we can create table with sorted bucket like the following: ``` sql("CREATE TABLE tbl(a INT, b INT) USING parquet " + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS") ``` However, creating table with sorted bucket in V2 failed with Exception `org.apache.spark.sql.AnalysisException: Cannot convert bucketing with sort columns to a transform.` ### Why are the changes needed? This PR adds sorted column in BucketTransform so we can create table in V2 with sorted bucket ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new UT Closes apache#34879 from huaxingao/sortedBucket. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent abf7662 commit aad72ad

File tree

7 files changed

+57
-23
lines changed

7 files changed

+57
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,13 @@ private[sql] object CatalogV2Implicits {
3838

3939
implicit class BucketSpecHelper(spec: BucketSpec) {
4040
def asTransform: BucketTransform = {
41+
val references = spec.bucketColumnNames.map(col => reference(Seq(col)))
4142
if (spec.sortColumnNames.nonEmpty) {
42-
throw QueryCompilationErrors.cannotConvertBucketWithSortColumnsToTransformError(spec)
43+
val sortedCol = spec.sortColumnNames.map(col => reference(Seq(col)))
44+
bucket(spec.numBuckets, references.toArray, sortedCol.toArray)
45+
} else {
46+
bucket(spec.numBuckets, references.toArray)
4347
}
44-
45-
val references = spec.bucketColumnNames.map(col => reference(Seq(col)))
46-
bucket(spec.numBuckets, references.toArray)
4748
}
4849
}
4950

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala

+28-8
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ private[sql] object LogicalExpressions {
4545
def bucket(numBuckets: Int, references: Array[NamedReference]): BucketTransform =
4646
BucketTransform(literal(numBuckets, IntegerType), references)
4747

48+
def bucket(
49+
numBuckets: Int,
50+
references: Array[NamedReference],
51+
sortedCols: Array[NamedReference]): BucketTransform =
52+
BucketTransform(literal(numBuckets, IntegerType), references, sortedCols)
53+
4854
def identity(reference: NamedReference): IdentityTransform = IdentityTransform(reference)
4955

5056
def years(reference: NamedReference): YearsTransform = YearsTransform(reference)
@@ -97,7 +103,8 @@ private[sql] abstract class SingleColumnTransform(ref: NamedReference) extends R
97103

98104
private[sql] final case class BucketTransform(
99105
numBuckets: Literal[Int],
100-
columns: Seq[NamedReference]) extends RewritableTransform {
106+
columns: Seq[NamedReference],
107+
sortedColumns: Seq[NamedReference] = Seq.empty[NamedReference]) extends RewritableTransform {
101108

102109
override val name: String = "bucket"
103110

@@ -107,7 +114,13 @@ private[sql] final case class BucketTransform(
107114

108115
override def arguments: Array[Expression] = numBuckets +: columns.toArray
109116

110-
override def describe: String = s"bucket(${arguments.map(_.describe).mkString(", ")})"
117+
override def describe: String =
118+
if (sortedColumns.nonEmpty) {
119+
s"bucket(${arguments.map(_.describe).mkString(", ")}," +
120+
s" ${sortedColumns.map(_.describe).mkString(", ")})"
121+
} else {
122+
s"bucket(${arguments.map(_.describe).mkString(", ")})"
123+
}
111124

112125
override def toString: String = describe
113126

@@ -117,23 +130,30 @@ private[sql] final case class BucketTransform(
117130
}
118131

119132
private[sql] object BucketTransform {
120-
def unapply(expr: Expression): Option[(Int, FieldReference)] = expr match {
133+
def unapply(expr: Expression): Option[(Int, FieldReference, FieldReference)] =
134+
expr match {
121135
case transform: Transform =>
122136
transform match {
123-
case BucketTransform(n, FieldReference(parts)) =>
124-
Some((n, FieldReference(parts)))
137+
case BucketTransform(n, FieldReference(parts), FieldReference(sortCols)) =>
138+
Some((n, FieldReference(parts), FieldReference(sortCols)))
125139
case _ =>
126140
None
127141
}
128142
case _ =>
129143
None
130144
}
131145

132-
def unapply(transform: Transform): Option[(Int, NamedReference)] = transform match {
146+
def unapply(transform: Transform): Option[(Int, NamedReference, NamedReference)] =
147+
transform match {
148+
case NamedTransform("bucket", Seq(
149+
Lit(value: Int, IntegerType),
150+
Ref(partCols: Seq[String]),
151+
Ref(sortCols: Seq[String]))) =>
152+
Some((value, FieldReference(partCols), FieldReference(sortCols)))
133153
case NamedTransform("bucket", Seq(
134154
Lit(value: Int, IntegerType),
135-
Ref(seq: Seq[String]))) =>
136-
Some((value, FieldReference(seq)))
155+
Ref(partCols: Seq[String]))) =>
156+
Some((value, FieldReference(partCols), FieldReference(Seq.empty[String])))
137157
case _ =>
138158
None
139159
}

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
2424
import org.apache.spark.sql.AnalysisException
2525
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
2626
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException}
27-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, InvalidUDFClassException}
27+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException}
2828
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2929
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
3030
import org.apache.spark.sql.catalyst.plans.JoinType
@@ -1371,11 +1371,6 @@ object QueryCompilationErrors {
13711371
new AnalysisException("Cannot use interval type in the table schema.")
13721372
}
13731373

1374-
def cannotConvertBucketWithSortColumnsToTransformError(spec: BucketSpec): Throwable = {
1375-
new AnalysisException(
1376-
s"Cannot convert bucketing with sort columns to a transform: $spec")
1377-
}
1378-
13791374
def cannotConvertTransformsToPartitionColumnsError(nonIdTransforms: Seq[Transform]): Throwable = {
13801375
new AnalysisException("Transforms cannot be converted to partition columns: " +
13811376
nonIdTransforms.map(_.describe).mkString(", "))

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class InMemoryTable(
161161
case (v, t) =>
162162
throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
163163
}
164-
case BucketTransform(numBuckets, ref) =>
164+
case BucketTransform(numBuckets, ref, _) =>
165165
val (value, dataType) = extractor(ref.fieldNames, cleanedSchema, row)
166166
val valueHashCode = if (value == null) 0 else value.hashCode
167167
((valueHashCode + 31 * dataType.hashCode()) & Integer.MAX_VALUE) % numBuckets

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,15 @@ class TransformExtractorSuite extends SparkFunSuite {
139139
}
140140

141141
bucketTransform match {
142-
case BucketTransform(numBuckets, FieldReference(seq)) =>
142+
case BucketTransform(numBuckets, FieldReference(seq), _) =>
143143
assert(numBuckets === 16)
144144
assert(seq === Seq("a", "b"))
145145
case _ =>
146146
fail("Did not match BucketTransform extractor")
147147
}
148148

149149
transform("unknown", ref("a", "b")) match {
150-
case BucketTransform(_, _) =>
150+
case BucketTransform(_, _, _) =>
151151
fail("Matched unknown transform")
152152
case _ =>
153153
// expected

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,8 @@ private[sql] object V2SessionCatalog {
293293
case IdentityTransform(FieldReference(Seq(col))) =>
294294
identityCols += col
295295

296-
case BucketTransform(numBuckets, FieldReference(Seq(col))) =>
297-
bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil))
296+
case BucketTransform(numBuckets, FieldReference(Seq(col)), FieldReference(Seq(sortCol))) =>
297+
bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, sortCol :: Nil))
298298

299299
case transform =>
300300
throw QueryExecutionErrors.unsupportedPartitionTransformError(transform)

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

+18
Original file line numberDiff line numberDiff line change
@@ -1609,6 +1609,24 @@ class DataSourceV2SQLSuite
16091609
}
16101610
}
16111611

1612+
test("create table using - with sorted bucket") {
1613+
val identifier = "testcat.table_name"
1614+
withTable(identifier) {
1615+
sql(s"CREATE TABLE $identifier (a int, b string, c int) USING $v2Source PARTITIONED BY (c)" +
1616+
s" CLUSTERED BY (b) SORTED by (a) INTO 4 BUCKETS")
1617+
val table = getTableMetadata(identifier)
1618+
val describe = spark.sql(s"DESCRIBE $identifier")
1619+
val part1 = describe
1620+
.filter("col_name = 'Part 0'")
1621+
.select("data_type").head.getString(0)
1622+
assert(part1 === "c")
1623+
val part2 = describe
1624+
.filter("col_name = 'Part 1'")
1625+
.select("data_type").head.getString(0)
1626+
assert(part2 === "bucket(4, b, a)")
1627+
}
1628+
}
1629+
16121630
test("REFRESH TABLE: v2 table") {
16131631
val t = "testcat.ns1.ns2.tbl"
16141632
withTable(t) {

0 commit comments

Comments
 (0)