Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge master into this #4

Merged
merged 7 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
#

org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.OptionsUriExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringURIExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,40 @@
"fieldExtractor" : "CatalogTableTableExtractor",
"catalogDesc" : null
} ],
"functionDescs" : [ ]
"functionDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.catalog.HiveTableRelation",
"scanDescs" : [ {
"fieldName" : "tableMeta",
"fieldExtractor" : "CatalogTableTableExtractor",
"catalogDesc" : null
} ],
"functionDescs" : [ ]
"functionDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.execution.datasources.LogicalRelation",
"scanDescs" : [ {
"fieldName" : "catalogTable",
"fieldExtractor" : "CatalogTableOptionTableExtractor",
"catalogDesc" : null
} ],
"functionDescs" : [ ]
"functionDescs" : [ ],
"uriDescs" : [ {
"fieldName" : "relation",
"fieldExtractor" : "BaseRelationFileIndexURIExtractor",
"actionTypeDesc" : null,
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation",
"scanDescs" : [ {
"fieldName" : null,
"fieldExtractor" : "DataSourceV2RelationTableExtractor",
"catalogDesc" : null
} ],
"functionDescs" : [ ]
"functionDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hive.HiveGenericUDF",
"scanDescs" : [ ],
Expand All @@ -43,7 +52,8 @@
"skipTypes" : [ "TEMP", "SYSTEM" ]
},
"isInput" : true
} ]
} ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hive.HiveGenericUDTF",
"scanDescs" : [ ],
Expand All @@ -57,7 +67,8 @@
"skipTypes" : [ "TEMP", "SYSTEM" ]
},
"isInput" : true
} ]
} ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hive.HiveSimpleUDF",
"scanDescs" : [ ],
Expand All @@ -71,7 +82,8 @@
"skipTypes" : [ "TEMP", "SYSTEM" ]
},
"isInput" : true
} ]
} ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hive.HiveUDAFFunction",
"scanDescs" : [ ],
Expand All @@ -85,5 +97,6 @@
"skipTypes" : [ "TEMP", "SYSTEM" ]
},
"isInput" : true
} ]
} ],
"uriDescs" : [ ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,12 @@
} ],
"opType" : "LOAD",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "path",
"fieldExtractor" : "StringURIExtractor",
"actionTypeDesc" : null,
"isInput" : true
} ]
}, {
"classname" : "org.apache.spark.sql.execution.command.RefreshTableCommand",
"tableDescs" : [ {
Expand Down Expand Up @@ -2029,6 +2034,28 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.MergeIntoCommand",
"tableDescs" : [ {
"fieldName" : "target",
"fieldExtractor" : "SubqueryAliasTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "source",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,15 @@ object PrivilegesBuilder {
buildQuery(a.child, privilegeObjects, projectionList, cols, spark)

case scan if isKnownScan(scan) && scan.resolved =>
getScanSpec(scan).tables(scan, spark).foreach(mergeProjection(_, scan))
val tables = getScanSpec(scan).tables(scan, spark)
// If the the scan is table-based, we check privileges on the table we found
// otherwise, we check privileges on the uri we found
if (tables.nonEmpty) {
tables.foreach(mergeProjection(_, scan))
} else {
getScanSpec(scan).uris(scan).foreach(
privilegeObjects += PrivilegeObject(_, PrivilegeObjectActionType.OTHER))
}

case u if u.nodeName == "UnresolvedRelation" =>
val parts = invokeAs[String](u, "tableName").split("\\.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ case class TableCommandSpec(
case class ScanSpec(
classname: String,
scanDescs: Seq[ScanDesc],
functionDescs: Seq[FunctionDesc] = Seq.empty) extends CommandSpec {
functionDescs: Seq[FunctionDesc] = Seq.empty,
uriDescs: Seq[UriDesc] = Seq.empty) extends CommandSpec {
override def opType: String = OperationType.QUERY.toString
def tables: (LogicalPlan, SparkSession) => Seq[Table] = (plan, spark) => {
scanDescs.flatMap { td =>
Expand All @@ -115,6 +116,18 @@ case class ScanSpec(
}
}

def uris: LogicalPlan => Seq[Uri] = plan => {
uriDescs.flatMap { ud =>
try {
ud.extract(plan)
} catch {
case e: Exception =>
LOG.debug(ud.error(plan, e))
None
}
}
}

def functions: (Expression) => Seq[Function] = (expr) => {
functionDescs.flatMap { fd =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.serde

import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.execution.datasources.HadoopFsRelation

trait URIExtractor extends (AnyRef => Seq[Uri]) with Extractor

Expand Down Expand Up @@ -47,3 +48,12 @@ class OptionsUriExtractor extends URIExtractor {
v1.asInstanceOf[Map[String, String]].get("path").map(Uri).toSeq
}
}

class BaseRelationFileIndexURIExtractor extends URIExtractor {
override def apply(v1: AnyRef): Seq[Uri] = {
v1 match {
case h: HadoopFsRelation => h.location.rootPaths.map(_.toString).map(Uri)
case _ => Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1430,17 +1430,24 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
.queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === LOAD)
assert(in.isEmpty)

assert(out.size === 1)
val po0 = out.head
assert(po0.actionType === PrivilegeObjectActionType.INSERT_OVERWRITE)
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assert(po0.objectName equalsIgnoreCase tableName.split("\\.").last)
assert(in.size === 1)
val po0 = in.head
assert(po0.actionType === PrivilegeObjectActionType.OTHER)
assert(po0.privilegeObjectType === PrivilegeObjectType.DFS_URL)
assert(po0.dbname === dataPath)
assert(po0.objectName === null)
assert(po0.columns.isEmpty)
checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = false)

assert(out.size === 1)
val po1 = out.head
assert(po1.actionType === PrivilegeObjectActionType.INSERT_OVERWRITE)
assert(po1.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po1.dbname)
assert(po1.objectName equalsIgnoreCase tableName.split("\\.").last)
assert(po1.columns.isEmpty)
checkTableOwner(po1)
val accessType0 = ranger.AccessType(po1, operationType, isInput = false)
assert(accessType0 === AccessType.UPDATE)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,19 @@ object DeltaCommands extends CommandSpecs[TableCommandSpec] {
DeleteCommand.copy(classname = cmd)
}

val MergeIntoCommand = {
val cmd = "org.apache.spark.sql.delta.commands.MergeIntoCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc = TableDesc(
"target",
classOf[SubqueryAliasTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
val queryDesc = QueryDesc("source")
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc))
}

override def specs: Seq[TableCommandSpec] = Seq(
DeleteCommand,
MergeIntoCommand,
UpdateCommand)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ object Scans extends CommandSpecs[ScanSpec] {
ScanDesc(
"catalogTable",
classOf[CatalogTableOptionTableExtractor])
ScanSpec(r, Seq(tableDesc))
val uriDesc = UriDesc("relation", classOf[BaseRelationFileIndexURIExtractor])
ScanSpec(r, Seq(tableDesc), uriDescs = Seq(uriDesc))
}

val DataSourceV2Relation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
fieldName = "table",
columnDesc = Some(columnDesc),
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), "LOAD")
val uriDesc = UriDesc("path", classOf[StringURIExtractor], isInput = true)
TableCommandSpec(cmd, Seq(tableDesc), LOAD, uriDescs = Seq(uriDesc))
}

val RefreshTable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,51 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(updateTableSql))
}
}

test("merge into table") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
doAs(admin, sql(createTableSql(namespace1, table2)))

val mergeIntoSql =
s"""
|MERGE INTO $namespace1.$table1 AS target
|USING $namespace1.$table2 AS source
|ON target.id = source.id
|WHEN MATCHED THEN
| UPDATE SET
| id = source.id,
| name = source.name,
| gender = source.gender,
| birthDate = source.birthDate
|WHEN NOT MATCHED
| THEN INSERT (
| id,
| name,
| gender,
| birthDate
| )
| VALUES (
| source.id,
| source.name,
| source.gender,
| source.birthDate
| )
|""".stripMargin
interceptContains[AccessControlException](
doAs(someone, sql(mergeIntoSql)))(
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
s" [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(mergeIntoSql))
}
}
}
}

object DeltaCatalogRangerSparkExtensionSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,4 +1097,62 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}

test("HadoopFsRelation") {
val db1 = defaultDb
val table1 = "table1"
withTempDir { path =>
withSingleCallEnabled {
withCleanTmpResources(Seq((s"$db1.$table1", "table"))) {
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
doAs(
admin,
sql(
s"""
|INSERT OVERWRITE DIRECTORY '$path'
|USING parquet
|SELECT * FROM $db1.$table1""".stripMargin))

interceptContains[AccessControlException](
doAs(
someone,
sql(
s"""
|INSERT OVERWRITE DIRECTORY '$path'
|USING parquet
|SELECT * FROM $db1.$table1""".stripMargin)))(
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope], " +
s"[update] privilege on [[$path, $path/]]")

doAs(admin, sql(s"SELECT * FROM parquet.`$path`".stripMargin).explain(true))
interceptContains[AccessControlException](
doAs(someone, sql(s"SELECT * FROM parquet.`$path`".stripMargin).explain(true)))(
s"does not have [select] privilege on " +
s"[[file:$path, file:$path/]]")
}
}
}
}

test("LoadDataCommand") {
val db1 = defaultDb
val table1 = "table1"
withSingleCallEnabled {
withTempDir { path =>
withCleanTmpResources(Seq((s"$db1.$table1", "table"))) {
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
val loadDataSql =
s"""
|LOAD DATA LOCAL INPATH '$path'
|OVERWRITE INTO TABLE $db1.$table1
|""".stripMargin
doAs(admin, sql(loadDataSql).explain(true))
interceptContains[AccessControlException](
doAs(someone, sql(loadDataSql).explain(true)))(
s"does not have [select] privilege on " +
s"[[$path, $path/]]")
}
}
}
}
}
2 changes: 0 additions & 2 deletions kyuubi-server/web-ui/.env.production
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

NODE_ENV=production

VITE_APP_DEV_WEB_URL='/'
Loading