Skip to content

Commit

Permalink
fix predicates push down error when query app_logs with line_num and …
Browse files Browse the repository at this point in the history
…make line_num starts from 1.
  • Loading branch information
naive-zhang committed Dec 21, 2024
1 parent f9f1fb2 commit b2ccf50
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ class YarnAppPartitionReader(yarnAppPartition: YarnAppPartition)

private def maybeFilter(app: ApplicationReport, filter: Filter): Boolean = {
filter match {
case EqualTo("id", appId: String) => app.getApplicationId.toString eq appId
case EqualTo("state", appState: String) => app.getYarnApplicationState.name() eq appState
case EqualTo("type", appType: String) => app.getApplicationType eq appType
case EqualTo("id", appId: String) => app.getApplicationId.toString.equals(appId)
case EqualTo("state", appState: String) => app.getYarnApplicationState.name().equals(appState)
case EqualTo("type", appType: String) => app.getApplicationType.equals(appType)
case In("state", states) => states.map(x => x.toString)
.contains(app.getYarnApplicationState.name())
case In("type", types) => types.map(x => x.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.spark.connector.yarn

import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.sources.{EqualTo, Filter}
import org.apache.spark.sql.sources.{EqualTo, Filter, In}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -33,8 +33,15 @@ case class YarnAppScanBuilder(options: CaseInsensitiveStringMap, schema: StructT
val (supportedFilter, unsupportedFilter) = filters.partition {
case filter: EqualTo =>
filter match {
case EqualTo("app_id", _) => true
case EqualTo("user", _) => true
case EqualTo("id", _) => true
case EqualTo("state", _) => true
case EqualTo("type", _) => true
case _ => false
}
case filter: In =>
filter match {
case In("state", _) => true
case In("type", _) => true
case _ => false
}
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.io.IOUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.sources.{EqualTo, Filter}
import org.apache.spark.unsafe.types.UTF8String

class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition)
Expand All @@ -52,8 +53,7 @@ class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition)
override def close(): Unit = {}

/**
* fet log
* * hadoop3:
* fetch log
* * /tmp/logs/xxx/bucket-xxx-tfile/0001/application_1734531705578_0001/localhost_32422
* * /tmp/logs/xxx/bucket-logs-tfile/0001/application_1734530210878_0001/localhost_24232
*
Expand Down Expand Up @@ -82,7 +82,7 @@ class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition)
val inputStream = fs.open(path)
val reader = new BufferedReader(new InputStreamReader(inputStream))
var line: String = null
var lineNumber: Int = 1
var lineNumber: Int = 0
val logEntries = new ArrayBuffer[LogEntry]()
try {
while ({
Expand All @@ -99,14 +99,25 @@ class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition)
path.toUri.getPath,
line)
}
logEntries
logEntries.filter(entry =>
yarnLogPartition.filters.forall(filter =>
maybeFilter(entry, filter)))
} finally {
IOUtils.closeStream(inputStream)
reader.close()
}
case _ => Seq.empty
}
}

private def maybeFilter(entry: LogEntry, filter: Filter): Boolean = {
filter match {
case EqualTo("app_id", appId: String) => entry.appId.equals(appId)
case EqualTo("container_id", containerId: String) => entry.containerId.equals(containerId)
case EqualTo("user", user: String) => entry.user.equals(user)
case _ => false
}
}
}

// Helper class to represent log entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.kyuubi.spark.connector.yarn

import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql.SparkSession
Expand All @@ -27,6 +25,8 @@ import org.apache.spark.sql.sources.{EqualTo, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.mutable

case class YarnLogScan(
options: CaseInsensitiveStringMap,
schema: StructType,
Expand Down Expand Up @@ -58,18 +58,16 @@ case class YarnLogScan(
val fs = FileSystem.get(hadoopConf)
val path = new Path(pathStr)
val logFiles = mutable.ArrayBuffer[FileStatus]()
if (fs.exists(path)) {
val fileStatuses: Array[FileStatus] = fs.globStatus(path)
if (fileStatuses != null && fileStatuses.nonEmpty) {
fileStatuses.foreach {
case status if status.isFile => logFiles += status
case status if status.isDirectory =>
val fileIterator = fs.listFiles(status.getPath, true)
while (fileIterator.hasNext) {
val fileStatus = fileIterator.next()
if (fileStatus.isFile) logFiles += fileStatus
}
}
val fileStatuses: Array[FileStatus] = fs.globStatus(path)
if (fileStatuses != null && fileStatuses.nonEmpty) {
fileStatuses.foreach {
case status if status.isFile => logFiles += status
case status if status.isDirectory =>
val fileIterator = fs.listFiles(status.getPath, true)
while (fileIterator.hasNext) {
val fileStatus = fileIterator.next()
if (fileStatus.isFile) logFiles += fileStatus
}
}
}
fs.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.spark.connector.yarn

import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.sources.{EqualTo, Filter, In}
import org.apache.spark.sql.sources.{EqualTo, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -33,14 +33,10 @@ case class YarnLogScanBuilder(options: CaseInsensitiveStringMap, schema: StructT
val (supportedFilter, unsupportedFilter) = filters.partition {
case filter: EqualTo =>
filter match {
case EqualTo("id", _) => true
case EqualTo("state", _) => true
case EqualTo("type", _) => true
}
case filter: In =>
filter match {
case In("state", _) => true
case In("type", _) => true
case EqualTo("app_id", _) => true
case EqualTo("user", _) => true
case EqualTo("container_id", _) => true
case _ => false
}
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class YarnLogQuerySuite extends SparkYarnConnectorWithYarn {
spark.sql("USE yarn")
val rows = spark.sql(
"select * from yarn.default.app_logs where line_num = 10" +
" and user='zhangxinsen'" +
" limit 10").collect()
val host = rows.head.getString(2)
assert(host == "localhost")
Expand Down

0 comments on commit b2ccf50

Please sign in to comment.