From f9f1fb201e4484f37828c9be0f11c2d3476a1b8d Mon Sep 17 00:00:00 2001 From: native-zhang Date: Sat, 21 Dec 2024 14:40:38 +0800 Subject: [PATCH] fix predicates push down error when query app_logs with line_num --- .../yarn/YarnAppPartitionReader.scala | 51 ++++++++++++------- .../connector/yarn/YarnAppScanBuilder.scala | 15 ++++++ .../connector/yarn/YarnLogPartition.scala | 4 +- .../yarn/YarnLogPartitionReader.scala | 3 +- .../spark/connector/yarn/YarnLogScan.scala | 10 ++-- .../connector/yarn/YarnLogScanBuilder.scala | 14 ++++- .../WithKyuubiServerAndYarnMiniCluster.scala | 6 +++ .../connector/yarn/YarnAppQuerySuite.scala | 1 - .../connector/yarn/YarnLogQuerySuite.scala | 10 ++-- 9 files changed, 84 insertions(+), 30 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartitionReader.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartitionReader.scala index 6b06ce7252c..936fb9b11bb 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartitionReader.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartitionReader.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.read.PartitionReader -import org.apache.spark.sql.sources.{EqualTo, In} +import org.apache.spark.sql.sources.{EqualTo, Filter, In} import org.apache.spark.unsafe.types.UTF8String class YarnAppPartitionReader(yarnAppPartition: YarnAppPartition) @@ -98,26 +98,41 @@ class YarnAppPartitionReader(yarnAppPartition: YarnAppPartition) case _ => yarnClient.getApplications() }.get } - - val appSeq = applicationReports.asScala.map(app => { - YarnApplication( - id = app.getApplicationId.toString, - appType = app.getApplicationType, - user = app.getUser, - name = app.getName, - state = app.getYarnApplicationState.name, - queue = app.getQueue, - attemptId = app.getCurrentApplicationAttemptId.toString, - submitTime = app.getSubmitTime, - launchTime = app.getLaunchTime, - startTime = app.getStartTime, - finishTime = app.getFinishTime, - trackingUrl = app.getTrackingUrl, - originalTrackingUrl = app.getOriginalTrackingUrl) - }) + val appSeq = applicationReports.asScala.filter(app => + yarnAppPartition.filters + .forall(filter => maybeFilter(app, filter))) + .map(app => { + YarnApplication( + id = app.getApplicationId.toString, + appType = app.getApplicationType, + user = app.getUser, + name = app.getName, + state = app.getYarnApplicationState.name, + queue = app.getQueue, + attemptId = app.getCurrentApplicationAttemptId.toString, + submitTime = app.getSubmitTime, + launchTime = app.getLaunchTime, + startTime = app.getStartTime, + finishTime = app.getFinishTime, + trackingUrl = app.getTrackingUrl, + originalTrackingUrl = app.getOriginalTrackingUrl) + }) yarnClient.close() appSeq } + + private def maybeFilter(app: ApplicationReport, filter: Filter): Boolean = { + filter match { + case EqualTo("id", appId: String) => app.getApplicationId.toString eq appId + case EqualTo("state", appState: String) => app.getYarnApplicationState.name() eq appState + case EqualTo("type", appType: String) => app.getApplicationType eq appType + case In("state", states) => states.map(x => x.toString) + .contains(app.getYarnApplicationState.name()) + case In("type", types) => types.map(x => x.toString) + .contains(app.getApplicationType) + case _ => false + } + } } // Helper class to represent app diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScanBuilder.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScanBuilder.scala index 3af6112354b..cc256f5d64b 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScanBuilder.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScanBuilder.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.spark.connector.yarn import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.sources.{EqualTo, Filter} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -27,4 +28,18 @@ case class YarnAppScanBuilder(options: CaseInsensitiveStringMap, schema: StructT override def build(): Scan = { YarnAppScan(options, schema, pushed) } + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + val (supportedFilter, unsupportedFilter) = filters.partition { + case filter: EqualTo => + filter match { + case EqualTo("app_id", _) => true + case EqualTo("user", _) => true + case _ => false + } + case _ => false + } + pushed = supportedFilter + unsupportedFilter + } } diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartition.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartition.scala index 7c9a0683693..918d774ab00 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartition.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartition.scala @@ -18,9 +18,11 @@ package org.apache.kyuubi.spark.connector.yarn import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.sources.Filter case class YarnLogPartition( hadoopConfMap: Map[String, String], logPath: String, - remoteAppLogDir: String) + remoteAppLogDir: String, + filters: Array[Filter]) extends InputPartition diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartitionReader.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartitionReader.scala index bedb90f2bac..fcdaafb15b3 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartitionReader.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartitionReader.scala @@ -96,14 +96,13 @@ class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition) s"${containerHost}_${containerSuffix}", containerHost, lineNumber, - path.getName, + path.toUri.getPath, line) } logEntries } finally { IOUtils.closeStream(inputStream) reader.close() - fs.close() } case _ => Seq.empty } diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScan.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScan.scala index 277992b80e5..ddb074b6c5f 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScan.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScan.scala @@ -89,9 +89,13 @@ case class YarnLogScan( case pushed if pushed.isEmpty => listFiles(remoteAppLogDir) case pushed => pushed.collectFirst { case EqualTo("app_id", appId: String) => - listFiles(s"${remoteAppLogDir}/*/*/*/${appId}") + listFiles(s"${remoteAppLogDir}/*/*/*/${appId}") ++ + // compatible for hadoop2 + listFiles(s"${remoteAppLogDir}/*/*/${appId}") case EqualTo("container_id", containerId: String) => - listFiles(s"${remoteAppLogDir}/*/*/*/*/${containerId}") + listFiles(s"${remoteAppLogDir}/*/*/*/*/${containerId}") ++ + // compatible for hadoop2 + listFiles(s"${remoteAppLogDir}/*/*/*/${containerId}") case EqualTo("user", user: String) => listFiles(s"${remoteAppLogDir}/${user}") case _ => listFiles(remoteAppLogDir) }.get @@ -101,7 +105,7 @@ case class YarnLogScan( override def planInputPartitions(): Array[InputPartition] = { // get file nums and construct nums inputPartition tryPushDownPredicates().map(fileStatus => { - YarnLogPartition(hadoopConfMap, fileStatus.getPath.toString, remoteAppLogDir) + YarnLogPartition(hadoopConfMap, fileStatus.getPath.toString, remoteAppLogDir, filters) }).toArray } diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScanBuilder.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScanBuilder.scala index 57c6679414c..708dbf8482a 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScanBuilder.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScanBuilder.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.spark.connector.yarn import org.apache.spark.sql.connector.read._ -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.{EqualTo, Filter, In} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -31,7 +31,17 @@ case class YarnLogScanBuilder(options: CaseInsensitiveStringMap, schema: StructT override def pushFilters(filters: Array[Filter]): Array[Filter] = { val (supportedFilter, unsupportedFilter) = filters.partition { - case _: org.apache.spark.sql.sources.EqualTo => true + case filter: EqualTo => + filter match { + case EqualTo("id", _) => true + case EqualTo("state", _) => true + case EqualTo("type", _) => true + } + case filter: In => + filter match { + case In("state", _) => true + case In("type", _) => true + } case _ => false } pushed = supportedFilter diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/WithKyuubiServerAndYarnMiniCluster.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/WithKyuubiServerAndYarnMiniCluster.scala index 78c11badfc9..2e53eea1caf 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/WithKyuubiServerAndYarnMiniCluster.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/WithKyuubiServerAndYarnMiniCluster.scala @@ -20,6 +20,8 @@ package org.apache.kyuubi.spark.connector.yarn import java.io.{File, FileWriter} import java.util.Collections +import scala.util.Random + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records.{ApplicationSubmissionContext, ContainerLaunchContext, Resource, YarnApplicationState} import org.apache.hadoop.yarn.client.api.YarnClient @@ -34,6 +36,8 @@ import org.apache.kyuubi.util.JavaUtils trait WithKyuubiServerAndYarnMiniCluster extends KyuubiFunSuite with WithKyuubiServer { + private val taskTypeSet: Set[String] = Set("TYPE_1", "TYPE_2", "TYPE_3") + override protected val conf: KyuubiConf = new KyuubiConf(false) val kyuubiHome: String = JavaUtils.getCodeSourceLocation(getClass).split("extensions").head @@ -154,6 +158,8 @@ trait WithKyuubiServerAndYarnMiniCluster extends KyuubiFunSuite with WithKyuubiS .getApplicationSubmissionContext.getApplicationId appContext.setApplicationId(applicationId) appContext.setApplicationName("TestApp") + // use random pickup + appContext.setApplicationType(taskTypeSet.toSeq(Random.nextInt(taskTypeSet.size))) // Set up container launch context (e.g., commands to execute) val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppQuerySuite.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppQuerySuite.scala index 1b79e3d15ee..6dfe4402885 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppQuerySuite.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppQuerySuite.scala @@ -190,5 +190,4 @@ class YarnAppQuerySuite extends SparkYarnConnectorWithYarn { yarnClient.close() } } - } diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogQuerySuite.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogQuerySuite.scala index 0599e18dbb4..ee894501002 100644 --- a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogQuerySuite.scala +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogQuerySuite.scala @@ -42,7 +42,9 @@ class YarnLogQuerySuite extends SparkYarnConnectorWithYarn { withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => spark.sql("USE yarn") val cnt = spark.sql( - "select count(1) from yarn.default.app_logs where host='localhost'").collect().head.getLong( + "select count(1) from yarn.default.app_logs " + + "where (host='localhost' or host like '%host') and " + + "app_id like '%application%'").collect().head.getLong( 0) assert(cnt > 0) } @@ -66,8 +68,10 @@ class YarnLogQuerySuite extends SparkYarnConnectorWithYarn { }) withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => spark.sql("USE yarn") - val host = spark.sql( - "select * from yarn.default.app_logs limit 10").collect().head.getString(2) + val rows = spark.sql( + "select * from yarn.default.app_logs where line_num = 10" + + " limit 10").collect() + val host = rows.head.getString(2) assert(host == "localhost") } }