Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32512][SQL] add alter table add/drop partition command for datasourcev2 #29339

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a57aafc
add table partition API
jackylee-ch Aug 3, 2020
6efca68
add AlterTableAddPartitionExec and AlterTableDropPartitionExec
jackylee-ch Aug 3, 2020
f0bc357
redefine AlterTableAddPartitionExec and AlterTableDropPartitionExec
jackylee-ch Aug 4, 2020
61cae52
fix test failed
jackylee-ch Aug 5, 2020
b1fc84b
change the warning for purge
jackylee-ch Aug 5, 2020
67fcb12
add SupportsAtomicPartitionManagement API support
jackylee-ch Aug 6, 2020
f4a6ee3
reorder match cases
jackylee-ch Aug 6, 2020
9bd20ba
fix errors
jackylee-ch Aug 6, 2020
6327ead
change match cases
jackylee-ch Aug 6, 2020
800c51a
add alter table add/drop partitions suites
jackylee-ch Aug 7, 2020
60b0a12
Merge branch 'master' into SPARK-32512-new
jackylee-ch Aug 13, 2020
a4c29a2
restart git action
jackylee-ch Aug 21, 2020
a6caf68
change code for comments
jackylee-ch Sep 27, 2020
3405f5b
use UnresolvedTableOrView to resolve the identifier
jackylee-ch Sep 27, 2020
9bc2e76
add children for AlterTableAddPartitionsStatement and AlterTableDropP…
jackylee-ch Sep 28, 2020
0740ef5
add ResolvedView analyzed
jackylee-ch Sep 28, 2020
ad40d7b
add ResolvePartitionSpec rule
jackylee-ch Sep 30, 2020
8fce669
change scala style
jackylee-ch Oct 19, 2020
af4b50b
change scala style
jackylee-ch Oct 19, 2020
b046909
fix scala-2.13 compile failed
jackylee-ch Oct 19, 2020
fbc2b58
add more implicit method
jackylee-ch Oct 19, 2020
dedb32a
remove unused imports
jackylee-ch Oct 21, 2020
41cf069
Merge branch 'master'
jackylee-ch Nov 6, 2020
96a62be
redefine ResolvePartitionSpec
jackylee-ch Nov 6, 2020
d5d8f13
fix test failed
jackylee-ch Nov 7, 2020
6bf49bd
add check in CheckAnalysis
jackylee-ch Nov 7, 2020
cdd7085
restart test
jackylee-ch Nov 9, 2020
0545538
change UnresolveTableOrView to UnresolvedTable
jackylee-ch Nov 10, 2020
f1fcac1
remove implicit class
jackylee-ch Nov 10, 2020
7014ba1
Merge branch 'master' into SPARK-32512-new
jackylee-ch Nov 10, 2020
69bbbd5
fix build failed
jackylee-ch Nov 10, 2020
effd0ed
Merge branch 'SPARK-32512-new' of https://github.com/stczwd/spark int…
jackylee-ch Nov 10, 2020
7377469
use ResolvedV1TableIdentifier
jackylee-ch Nov 10, 2020
c6711cd
fix suite test failed
jackylee-ch Nov 10, 2020
dcd5060
fix suite test failed
jackylee-ch Nov 10, 2020
d316e56
fix test failed
jackylee-ch Nov 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.catalog;

import java.util.Map;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
import org.apache.spark.sql.types.StructType;

/**
* A partition interface of {@link Table}.
* A partition is composed of identifier and properties,
* and properties contains metadata information of the partition.
* <p>
* These APIs are used to modify table partition identifier or partition metadata.
* In some cases, they will change the table data as well.
* ${@link #createPartition}:
* add a partition and any data that its location contains to the table
* ${@link #dropPartition}:
* remove a partition and any data it contains from the table
* ${@link #replacePartitionMetadata}:
* point a partition to a new location, which will swap one location's data for the other
*
* @since 3.1.0
*/
@Experimental
public interface SupportsPartitions extends Table {

/**
* @return the partition schema of table
*/
StructType partitionSchema();

/**
* Create a partition in table.
*
* @param ident a new partition identifier
* @param properties the metadata of a partition
* @throws PartitionAlreadyExistsException If a partition already exists for the identifier
* @throws UnsupportedOperationException If partition property is not supported
*/
void createPartition(
InternalRow ident,
Map<String, String> properties)
throws PartitionAlreadyExistsException, UnsupportedOperationException;

/**
* Drop a partition from table.
*
* @param ident a partition identifier
* @return true if a partition was deleted, false if no partition exists for the identifier
*/
boolean dropPartition(InternalRow ident);

/**
* Replace the partition metadata of the existing partition.
*
* @param ident the partition identifier of the existing partition
* @param properties the new metadata of the partition
* @throws NoSuchPartitionException If the partition identifier to alter doesn't exist
* @throws UnsupportedOperationException If partition property is not supported
*/
void replacePartitionMetadata(
InternalRow ident,
Map<String, String> properties)
throws NoSuchPartitionException, UnsupportedOperationException;

/**
* Retrieve the partition metadata of the existing partition.
*
* @param ident a partition identifier
* @return the metadata of the partition
* @throws UnsupportedOperationException If partition property is not supported
*/
Map<String, String> loadPartitionMetadata(InternalRow ident)
throws UnsupportedOperationException;

/**
* List the identifiers of all partitions that contains the ident in a table.
*
* @param ident a prefix of partition identifier
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionIdentifiers(InternalRow ident);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType

/**
* Thrown by a catalog when an item already exists. The analyzer will rethrow the exception
Expand Down Expand Up @@ -48,9 +50,17 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes
class TempTableAlreadyExistsException(table: String)
extends TableAlreadyExistsException(s"Temporary view '$table' already exists")

class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec)
extends AnalysisException(
s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, spec: TablePartitionSpec) = {
this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
}

def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
this(s"Partition already exists in table $tableName:" +
partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(","))
}
}

class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec])
extends AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType


/**
Expand All @@ -46,12 +48,17 @@ class NoSuchTableException(message: String) extends AnalysisException(message) {
}
}

class NoSuchPartitionException(
db: String,
table: String,
spec: TablePartitionSpec)
extends AnalysisException(
s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
class NoSuchPartitionException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, spec: TablePartitionSpec) = {
this(s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
}

def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
this(s"Partition not found in table $tableName: " +
s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")}")
}
}

class NoSuchPermanentFunctionException(db: String, func: String)
extends AnalysisException(s"Function '$func' not found in database '$db'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case ShowCurrentNamespaceStatement() =>
ShowCurrentNamespace(catalogManager)

case AlterTableAddPartitionStatement(
NonSessionCatalogAndTable(catalog, tableName), partitionSpecsAndLocs, ifNotExists) =>
AlterTableAddPartition(
catalog.asTableCatalog,
tableName.asIdentifier,
partitionSpecsAndLocs,
ifNotExists)

case AlterTableDropPartitionStatement(
NonSessionCatalogAndTable(catalog, tableName), specs, ifExists, purge, retainData) =>
AlterTableDropPartition(
catalog.asTableCatalog,
tableName.asIdentifier,
specs,
ifExists,
purge,
retainData)
}

object NonSessionCatalogAndTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,3 +551,35 @@ case class ShowFunctions(
pattern: Option[String]) extends Command {
override def children: Seq[LogicalPlan] = child.toSeq
}

/**
* The logical plan of the ALTER TABLE ADD PARTITION command that works for v2 tables.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
* PARTITION spec2 [LOCATION 'loc2']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the second PARTITION mandatory? Can you use similar format as for DROP?

ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1'] [, PARTITION spec2 [LOCATION 'loc2'], ...];

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeap, thanks

* }}}
*/
case class AlterTableAddPartition(
catalog: TableCatalog,
ident: Identifier,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
ignoreIfExists: Boolean) extends Command

/**
* The logical plan of the ALTER TABLE DROP PARTITION command that works for v2 tables.
* This may remove the data and metadata for this partition.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* }}}
*/
case class AlterTableDropPartition(
catalog: TableCatalog,
ident: Identifier,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean) extends Command
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsPartitions, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.types.{ByteType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object DataSourceV2Implicits {
Expand Down Expand Up @@ -52,6 +55,15 @@ object DataSourceV2Implicits {
}
}

def asPartitionable: SupportsPartitions = {
table match {
case support: SupportsPartitions =>
support
case _ =>
throw new AnalysisException(s"Table does not support partitions: ${table.name}")
}
}

def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability)

def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
Expand All @@ -62,4 +74,31 @@ object DataSourceV2Implicits {
new CaseInsensitiveStringMap(options.asJava)
}
}

def convertPartitionIndentifers(
partSpec: TablePartitionSpec,
partSchema: StructType): InternalRow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this included with the implicits when it isn't an implicit class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, a little ugly to me if it defined with implicits. I can change it if you think it's better with implicits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to be implicit. I just don't think it belongs in the implicits class if it isn't an implicit. I think there is a util class you could include this in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I misunderstood. thanks.

val partValues = partSchema.map { part =>
part.dataType match {
case _: ByteType =>
partSpec.getOrElse(part.name, "0").toByte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conversion to InternalRow should not modify the partition values by filling in defaults. Filling in a default like this is a correctness bug.

I think this should require that all partition names are present in the map, and pass null if a name is present but does not have a value. If the partition doesn't allow null partition values, then it should throw an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. sounds reasonable to me.

case _: ShortType =>
partSpec.getOrElse(part.name, "0").toShort
case _: IntegerType =>
partSpec.getOrElse(part.name, "0").toInt
case _: LongType =>
partSpec.getOrElse(part.name, "0").toLong
case _: FloatType =>
partSpec.getOrElse(part.name, "0").toFloat
case _: DoubleType =>
partSpec.getOrElse(part.name, "0").toDouble
case _: StringType =>
partSpec.getOrElse(part.name, "")
case _ =>
throw new AnalysisException(
s"Type ${part.dataType.typeName} is not supported for partition.")
}
}
InternalRow.fromSeq(partValues)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector

import java.util
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.SupportsPartitions
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType


/**
* This class is used to test SupportsPartitions API.
*/
class InMemoryPartitionTable(
name: String,
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String])
extends InMemoryTable(name, schema, partitioning, properties) with SupportsPartitions {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

private val memoryTablePartitions: util.Map[InternalRow, util.Map[String, String]] =
new ConcurrentHashMap[InternalRow, util.Map[String, String]]()

def partitionSchema: StructType = {
val partitionColumnNames = partitioning.toSeq.asPartitionColumns
new StructType(schema.filter(p => partitionColumnNames.contains(p.name)).toArray)
}

def createPartition(
ident: InternalRow,
properties: util.Map[String, String]): Unit = {
if (memoryTablePartitions.containsKey(ident)) {
throw new PartitionAlreadyExistsException(name, ident, partitionSchema)
} else {
memoryTablePartitions.put(ident, properties)
}
}

def dropPartition(ident: InternalRow): Boolean = {
if (memoryTablePartitions.containsKey(ident)) {
memoryTablePartitions.remove(ident)
true
} else {
false
}
}

def replacePartitionMetadata(ident: InternalRow, properties: util.Map[String, String]): Unit = {
if (memoryTablePartitions.containsKey(ident)) {
memoryTablePartitions.put(ident, properties)
} else {
throw new NoSuchPartitionException(name, ident, partitionSchema)
}
}

def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = {
if (memoryTablePartitions.containsKey(ident)) {
memoryTablePartitions.get(ident)
} else {
throw new NoSuchPartitionException(name, ident, partitionSchema)
}
}

def listPartitionIdentifiers(ident: InternalRow): Array[InternalRow] = {
val prefixPartCols =
new StructType(partitionSchema.dropRight(partitionSchema.length - ident.numFields).toArray)
val prefixPart = ident.toSeq(prefixPartCols)
memoryTablePartitions.keySet().asScala
.filter(_.toSeq(partitionSchema).startsWith(prefixPart)).toArray
}

def partitionExists(ident: InternalRow): Boolean = {
memoryTablePartitions.containsKey(ident)
}

def clearPartitions(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

memoryTablePartitions.clear()
}
}
Loading