Skip to content

Commit

Permalink
[SPARK-33955][SS] Add latest offsets to source progress
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This patch proposes to add latest offset to source progress for streaming queries.

### Why are the changes needed?

Currently we record start and end offsets per source in streaming process. Latest offset is an important information for streaming process but the progress lacks of this info. We can use it to track the process lag and adjust streaming queries. We should add latest offset to source progress.

### Does this PR introduce _any_ user-facing change?

Yes, for new metric about latest source offset in source progress.

### How was this patch tested?

Unit test. Manually test in Spark cluster:

```
    "description" : "KafkaV2[Subscribe[page_view_events]]",
    "startOffset" : {
      "page_view_events" : {
        "2" : 582370921,
        "4" : 391910836,
        "1" : 631009201,
        "3" : 406601346,
        "0" : 195799112
      }
    },
    "endOffset" : {
      "page_view_events" : {
        "2" : 583764414,
        "4" : 392338002,
        "1" : 632183480,
        "3" : 407101489,
        "0" : 197304028
      }
    },
    "latestOffset" : {
      "page_view_events" : {
        "2" : 589852545,
        "4" : 394204277,
        "1" : 637313869,
        "3" : 409286602,
        "0" : 203878962
      }
    },
    "numInputRows" : 4999997,
    "inputRowsPerSecond" : 29287.70501405811,
```

Closes apache#30988 from viirya/latest-offset.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
viirya authored and dongjoon-hyun committed Jan 3, 2021
1 parent cfd4a08 commit 963c60f
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ private[kafka010] class KafkaMicroBatchStream(

private var endPartitionOffsets: KafkaSourceOffset = _

private var latestPartitionOffsets: PartitionOffsetMap = _

/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand All @@ -77,14 +79,18 @@ private[kafka010] class KafkaMicroBatchStream(
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

override def reportLatestOffset(): Offset = {
KafkaSourceOffset(latestPartitionOffsets)
}

override def latestOffset(): Offset = {
throw new UnsupportedOperationException(
"latestOffset(Offset, ReadLimit) should be called instead of this method")
}

override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
endPartitionOffsets = KafkaSourceOffset(readLimit match {
case rows: ReadMaxRows =>
rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,13 @@ private[kafka010] class KafkaSource(
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

// The offsets for each topic-partition currently read to process. Note this maybe not necessarily
// to be latest offsets because we possibly apply a read limit.
private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None

// The latest offsets for each topic-partition.
private var latestPartitionOffsets: Option[Map[TopicPartition, Long]] = None

private val converter = new KafkaRecordToRowConverter()

override def schema: StructType = KafkaRecordToRowConverter.kafkaSchema(includeHeaders)
Expand All @@ -127,6 +132,10 @@ private[kafka010] class KafkaSource(
"latestOffset(Offset, ReadLimit) should be called instead of this method")
}

override def reportLatestOffset(): streaming.Offset = {
latestPartitionOffsets.map(KafkaSourceOffset(_)).getOrElse(null)
}

override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
Expand All @@ -145,6 +154,7 @@ private[kafka010] class KafkaSource(
}

currentPartitionOffsets = Some(offsets)
latestPartitionOffsets = Some(latest)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
KafkaSourceOffset(offsets)
}
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ object MimaExcludes {
// Exclude rules for 3.2.x
lazy val v32excludes = v31excludes ++ Seq(
// [SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.write.V1WriteBuilder")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.write.V1WriteBuilder"),

// [SPARK-33955] Add latest offsets to source progress
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceProgress.this")
)

// Exclude rules for 3.1.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,12 @@ public interface SupportsAdmissionControl extends SparkDataStream {
* for the very first micro-batch. The source can return `null` if there is no data to process.
*/
Offset latestOffset(Offset startOffset, ReadLimit limit);

/**
* Returns the most recent offset available.
*
* The source can return `null`, if there is no data to process or the source does not support
* to this method.
*/
default Offset reportLatestOffset() { return null; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ class MicroBatchExecution(
}

// Record the trigger offset range for progress reporting *before* processing the batch
recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
recordTriggerOffsets(
from = committedOffsets,
to = availableOffsets,
latest = latestOffsets)

// Remember whether the current batch has data or not. This will be required later
// for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
Expand Down Expand Up @@ -379,7 +382,7 @@ class MicroBatchExecution(
if (isCurrentBatchConstructed) return true

// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = uniqueSources.map {
val (nextOffsets, recentOffsets) = uniqueSources.toSeq.map {
case (s: SupportsAdmissionControl, limit) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("latestOffset") {
Expand All @@ -391,23 +394,31 @@ class MicroBatchExecution(
startOffsetOpt.map(offset => v2.deserializeOffset(offset.json))
.getOrElse(v2.initialOffset())
}
(s, Option(s.latestOffset(startOffset, limit)))
val next = s.latestOffset(startOffset, limit)
val latest = s.reportLatestOffset()
((s, Option(next)), (s, Option(latest)))
}
case (s: Source, _) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
val offset = s.getOffset
((s, offset), (s, offset))
}
case (s: MicroBatchStream, _) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("latestOffset") {
(s, Option(s.latestOffset()))
val latest = s.latestOffset()
((s, Option(latest)), (s, Option(latest)))
}
case (s, _) =>
// for some reason, the compiler is unhappy and thinks the match is not exhaustive
throw new IllegalStateException(s"Unexpected source: $s")
}
availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
}.unzip

availableOffsets ++= nextOffsets.filter { case (_, o) => o.nonEmpty }
.map(p => p._1 -> p._2.get).toMap
latestOffsets ++= recentOffsets.filter { case (_, o) => o.nonEmpty }
.map(p => p._1 -> p._2.get).toMap

// Update the query metadata
offsetSeqMetadata = offsetSeqMetadata.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ trait ProgressReporter extends Logging {
private var currentTriggerEndTimestamp = -1L
private var currentTriggerStartOffsets: Map[SparkDataStream, String] = _
private var currentTriggerEndOffsets: Map[SparkDataStream, String] = _
private var currentTriggerLatestOffsets: Map[SparkDataStream, String] = _

// TODO: Restore this from the checkpoint when possible.
private var lastTriggerStartTimestamp = -1L

Expand Down Expand Up @@ -119,16 +121,21 @@ trait ProgressReporter extends Logging {
currentTriggerStartTimestamp = triggerClock.getTimeMillis()
currentTriggerStartOffsets = null
currentTriggerEndOffsets = null
currentTriggerLatestOffsets = null
currentDurationsMs.clear()
}

/**
* Record the offsets range this trigger will process. Call this before updating
* `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded.
*/
protected def recordTriggerOffsets(from: StreamProgress, to: StreamProgress): Unit = {
protected def recordTriggerOffsets(
from: StreamProgress,
to: StreamProgress,
latest: StreamProgress): Unit = {
currentTriggerStartOffsets = from.mapValues(_.json).toMap
currentTriggerEndOffsets = to.mapValues(_.json).toMap
currentTriggerLatestOffsets = latest.mapValues(_.json).toMap
}

private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
Expand All @@ -151,7 +158,8 @@ trait ProgressReporter extends Logging {
* though the sources don't have any new data.
*/
protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
currentTriggerLatestOffsets != null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

val executionStats = extractExecutionStats(hasNewData, hasExecuted)
Expand All @@ -171,6 +179,7 @@ trait ProgressReporter extends Logging {
description = source.toString,
startOffset = currentTriggerStartOffsets.get(source).orNull,
endOffset = currentTriggerEndOffsets.get(source).orNull,
latestOffset = currentTriggerLatestOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ abstract class StreamExecution(
@volatile
var availableOffsets = new StreamProgress

/**
* Tracks the latest offsets for each input source.
* Only the scheduler thread should modify this field, and only in atomic steps.
* Other threads should make a shallow copy if they are going to access this field more than
* once, since the field's value may change at any time.
*/
@volatile
var latestOffsets = new StreamProgress

@volatile
var sinkCommitProgress: Option[StreamWriterCommitProgress] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class ContinuousExecution(

synchronized {
// Record offsets before updating `committedOffsets`
recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
recordTriggerOffsets(from = committedOffsets, to = availableOffsets, latest = latestOffsets)
if (queryExecutionThread.isAlive) {
commitLog.add(epoch, CommitMetadata())
val offset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class StreamingQueryProgress private[sql](
* @param description Description of the source.
* @param startOffset The starting offset for data being read.
* @param endOffset The ending offset for data being read.
* @param latestOffset The latest offset from this source.
* @param numInputRows The number of records read from this source.
* @param inputRowsPerSecond The rate at which data is arriving from this source.
* @param processedRowsPerSecond The rate at which data from this source is being processed by
Expand All @@ -184,6 +185,7 @@ class SourceProgress protected[sql](
val description: String,
val startOffset: String,
val endOffset: String,
val latestOffset: String,
val numInputRows: Long,
val inputRowsPerSecond: Double,
val processedRowsPerSecond: Double) extends Serializable {
Expand All @@ -204,6 +206,7 @@ class SourceProgress protected[sql](
("description" -> JString(description)) ~
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("latestOffset" -> tryParse(latestOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "description" : "source",
| "startOffset" : 123,
| "endOffset" : 456,
| "latestOffset" : 789,
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0
| } ],
Expand Down Expand Up @@ -121,6 +122,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "description" : "source",
| "startOffset" : 123,
| "endOffset" : 456,
| "latestOffset" : 789,
| "numInputRows" : 678
| } ],
| "sink" : {
Expand Down Expand Up @@ -333,6 +335,7 @@ object StreamingQueryStatusAndProgressSuite {
description = "source",
startOffset = "123",
endOffset = "456",
latestOffset = "789",
numInputRows = 678,
inputRowsPerSecond = 10.0,
processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json
Expand Down Expand Up @@ -361,6 +364,7 @@ object StreamingQueryStatusAndProgressSuite {
description = "source",
startOffset = "123",
endOffset = "456",
latestOffset = "789",
numInputRows = 678,
inputRowsPerSecond = Double.NaN, // should not be present in the json
processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json
Expand Down

0 comments on commit 963c60f

Please sign in to comment.