-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32512][SQL] add alter table add/drop partition command for datasourcev2 #29339
Changes from 14 commits
a57aafc
6efca68
f0bc357
61cae52
b1fc84b
67fcb12
f4a6ee3
9bd20ba
6327ead
800c51a
60b0a12
a4c29a2
a6caf68
3405f5b
9bc2e76
0740ef5
ad40d7b
8fce669
af4b50b
b046909
fbc2b58
dedb32a
41cf069
96a62be
d5d8f13
6bf49bd
cdd7085
0545538
f1fcac1
7014ba1
69bbbd5
effd0ed
7377469
c6711cd
dcd5060
d316e56
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3418,7 +3418,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging | |
spec -> location | ||
} | ||
AlterTableAddPartitionStatement( | ||
visitMultipartIdentifier(ctx.multipartIdentifier), | ||
UnresolvedTableOrView(visitMultipartIdentifier(ctx.multipartIdentifier)), | ||
specsAndLocs.toSeq, | ||
ctx.EXISTS != null) | ||
} | ||
|
@@ -3458,7 +3458,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging | |
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx) | ||
} | ||
AlterTableDropPartitionStatement( | ||
visitMultipartIdentifier(ctx.multipartIdentifier), | ||
UnresolvedTableOrView(visitMultipartIdentifier(ctx.multipartIdentifier)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec).toSeq, | ||
ifExists = ctx.EXISTS != null, | ||
purge = ctx.PURGE != null, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
package org.apache.spark.sql.catalyst.plans.logical | ||
|
||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, UnresolvedException} | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable} | ||
|
@@ -551,3 +552,31 @@ case class ShowFunctions( | |
pattern: Option[String]) extends Command { | ||
override def children: Seq[LogicalPlan] = child.toSeq | ||
} | ||
|
||
/** | ||
* The logical plan of the ALTER TABLE ADD PARTITION command that works for v2 tables. | ||
* | ||
* The syntax of this command is: | ||
* {{{ | ||
* ALTER TABLE table ADD [IF NOT EXISTS] | ||
* PARTITION spec1 [LOCATION 'loc1'][, PARTITION spec2 [LOCATION 'loc2'], ...]; | ||
* }}} | ||
*/ | ||
case class AlterTableAddPartition( | ||
table: SupportsPartitionManagement, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should make this Then you can move the logics in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason why I keep There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good point and does reveal a flaw in the current framework. We can add some util classes to help us resolve partition spec
then the command can be defined as
And we add a rule to match |
||
parts: Seq[(InternalRow, Map[String, String])], | ||
ignoreIfExists: Boolean) extends Command | ||
|
||
/** | ||
* The logical plan of the ALTER TABLE DROP PARTITION command that works for v2 tables. | ||
* This may remove the data and metadata for this partition. | ||
* | ||
* The syntax of this command is: | ||
* {{{ | ||
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]; | ||
* }}} | ||
*/ | ||
case class AlterTableDropPartition( | ||
table: SupportsPartitionManagement, | ||
partIdents: Seq[InternalRow], | ||
ignoreIfNotExists: Boolean) extends Command |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,10 @@ package org.apache.spark.sql.execution.datasources.v2 | |
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability} | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability} | ||
import org.apache.spark.sql.types.{ByteType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructType} | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
||
object DataSourceV2Implicits { | ||
|
@@ -52,6 +55,26 @@ object DataSourceV2Implicits { | |
} | ||
} | ||
|
||
def asPartitionable: SupportsPartitionManagement = { | ||
table match { | ||
case support: SupportsPartitionManagement => | ||
support | ||
case _ => | ||
throw new AnalysisException( | ||
s"Table does not support partition management: ${table.name}") | ||
} | ||
} | ||
|
||
def asAtomicPartitionable: SupportsAtomicPartitionManagement = { | ||
table match { | ||
case support: SupportsAtomicPartitionManagement => | ||
support | ||
case _ => | ||
throw new AnalysisException( | ||
s"Table does not support atomic partition management: ${table.name}") | ||
} | ||
} | ||
|
||
def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability) | ||
|
||
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports) | ||
|
@@ -62,4 +85,42 @@ object DataSourceV2Implicits { | |
new CaseInsensitiveStringMap(options.asJava) | ||
} | ||
} | ||
|
||
implicit class TablePartitionSpecHelper(partSpec: TablePartitionSpec) { | ||
def asPartitionIdentifier(partSchema: StructType): InternalRow = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's move it to |
||
val conflictKeys = partSpec.keys.toSeq.diff(partSchema.map(_.name)) | ||
if (conflictKeys.nonEmpty) { | ||
throw new AnalysisException(s"Partition key ${conflictKeys.mkString(",")} not exists") | ||
} | ||
|
||
val partValues = partSchema.map { part => | ||
val partValue = partSpec.get(part.name).orNull | ||
if (partValue == null) { | ||
null | ||
} else { | ||
// TODO: Support other datatypes, such as DateType | ||
part.dataType match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about other types like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a |
||
case _: ByteType => | ||
partValue.toByte | ||
case _: ShortType => | ||
partValue.toShort | ||
case _: IntegerType => | ||
partValue.toInt | ||
case _: LongType => | ||
partValue.toLong | ||
case _: FloatType => | ||
partValue.toFloat | ||
case _: DoubleType => | ||
partValue.toDouble | ||
case _: StringType => | ||
partValue | ||
case _ => | ||
throw new AnalysisException( | ||
s"Type ${part.dataType.typeName} is not supported for partition.") | ||
} | ||
} | ||
} | ||
InternalRow.fromSeq(partValues) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,4 +92,8 @@ class InMemoryPartitionTable( | |
|
||
override def partitionExists(ident: InternalRow): Boolean = | ||
memoryTablePartitions.containsKey(ident) | ||
|
||
def clearPartitions(): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can remove it now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
memoryTablePartitions.clear() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.connector | ||
|
||
import java.util | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException | ||
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier, Table} | ||
import org.apache.spark.sql.connector.expressions.Transform | ||
import org.apache.spark.sql.types.StructType | ||
|
||
class InMemoryPartitionTableCatalog extends InMemoryTableCatalog { | ||
import CatalogV2Implicits._ | ||
|
||
override def createTable( | ||
ident: Identifier, | ||
schema: StructType, | ||
partitions: Array[Transform], | ||
properties: util.Map[String, String]): Table = { | ||
if (tables.containsKey(ident)) { | ||
throw new TableAlreadyExistsException(ident) | ||
} | ||
|
||
InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) | ||
|
||
val table = new InMemoryAtomicPartitionTable( | ||
s"$name.${ident.quoted}", schema, partitions, properties) | ||
tables.put(ident, table) | ||
namespaces.putIfAbsent(ident.namespace.toList, Map()) | ||
table | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | |
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table} | ||
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, SupportsPartitionManagement, TableCatalog, TableChange, V1Table} | ||
import org.apache.spark.sql.connector.expressions.Transform | ||
import org.apache.spark.sql.execution.command._ | ||
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} | ||
|
@@ -497,10 +497,11 @@ class ResolveSessionCatalog( | |
v1TableName.asTableIdentifier, | ||
"ALTER TABLE RECOVER PARTITIONS") | ||
|
||
case AlterTableAddPartitionStatement(tbl, partitionSpecsAndLocs, ifNotExists) => | ||
val v1TableName = parseV1Table(tbl, "ALTER TABLE ADD PARTITION") | ||
case AlterTableAddPartitionStatement( | ||
r @ ResolvedTable(_, _, _: V1Table), partitionSpecsAndLocs, ifNotExists) | ||
if isSessionCatalog(r.catalog) => | ||
AlterTableAddPartitionCommand( | ||
v1TableName.asTableIdentifier, | ||
r.identifier.asTableIdentifier, | ||
partitionSpecsAndLocs, | ||
ifNotExists) | ||
|
||
|
@@ -511,10 +512,11 @@ class ResolveSessionCatalog( | |
from, | ||
to) | ||
|
||
case AlterTableDropPartitionStatement(tbl, specs, ifExists, purge, retainData) => | ||
val v1TableName = parseV1Table(tbl, "ALTER TABLE DROP PARTITION") | ||
case AlterTableDropPartitionStatement( | ||
r @ ResolvedTable(_, _, _: V1Table), specs, ifExists, purge, retainData) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
if isSessionCatalog(r.catalog) => | ||
AlterTableDropPartitionCommand( | ||
v1TableName.asTableIdentifier, | ||
r.identifier.asTableIdentifier, | ||
specs, | ||
ifExists, | ||
purge, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't support view, we should use
UnresolvedTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Em. Maybe it is a good idea to do this in another PR, as there're other commands use this, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't change behavior in this PR. If
AlterTableAddPartition
fails before if the name refers to a view, we should useUnresolvedTable
to keep the failing behavior.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok