Skip to content

Commit

Permalink
[SPARK-43183][SS] Introduce a new callback "onQueryIdle" to Streaming…
Browse files Browse the repository at this point in the history
…QueryListener

### What changes were proposed in this pull request?

This PR introduces a new callback "onQueryIdle" to StreamingQueryListener, which was a part of query progress update.

The signature of the new callback method is below:

```
def onQueryIdle(event: QueryIdleEvent): Unit

class QueryIdleEvent(val id: UUID, val runId: UUID) extends Event
```

This PR proposes to provide a default implementation for onQueryIdle in StreamingQueryListener so that it does not break existing implementations of streaming query listener in Scala/Java.

Note that it's a behavioral change as users will receive the different callback when the streaming query is being idle for configured period of time (previously they receive the callback onQueryProgress), but this is worth doing as described in the section "Why are the changes needed?".

### Why are the changes needed?

People has been having a lot of confusions about query progress event on idleness query; it’s not only the matter of understanding but also comes up with various types of complaints, because they tend to think the event only happens after the microbatch has finished. In addition, misunderstanding may also lead to data loss on monitoring - since we give the latest batch ID for update event on idleness, if the listener implementation blindly performs upsert the information to the external storage based on batch ID, they are in risk on losing data.

This also complicates the logic because we have to memorize the execution for the previous batch, which is arguably not necessary.

### Does this PR introduce _any_ user-facing change?

Yes. After this change, users won't get query progress update event from idle query. Instead, they will get query idle event.

### How was this patch tested?

Modified UTs.

Closes apache#40845 from HeartSaVioR/SPARK-43183.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Apr 20, 2023
1 parent 47ac097 commit 2178769
Show file tree
Hide file tree
Showing 16 changed files with 204 additions and 125 deletions.
3 changes: 3 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3577,6 +3577,9 @@ def observe(
... # Trigger alert
... pass
...
... def onQueryIdle(self, event):
... pass
...
... def onQueryTerminated(self, event):
... pass
...
Expand Down
46 changes: 46 additions & 0 deletions python/pyspark/sql/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class StreamingQueryListener(ABC):
... # Do something with event.
... pass
...
... def onQueryIdle(self, event: QueryIdleEvent) -> None:
... # Do something with event.
... pass
...
... def onQueryTerminated(self, event: QueryTerminatedEvent) -> None:
... # Do something with event.
... pass
Expand Down Expand Up @@ -87,6 +91,13 @@ def onQueryProgress(self, event: "QueryProgressEvent") -> None:
"""
pass

@abstractmethod
def onQueryIdle(self, event: "QueryIdleEvent") -> None:
"""
Called when the query is idle and waiting for new data to process.
"""
pass

@abstractmethod
def onQueryTerminated(self, event: "QueryTerminatedEvent") -> None:
"""
Expand Down Expand Up @@ -123,6 +134,9 @@ def onQueryStarted(self, jevent: JavaObject) -> None:
def onQueryProgress(self, jevent: JavaObject) -> None:
self.pylistener.onQueryProgress(QueryProgressEvent(jevent))

def onQueryIdle(self, jevent: JavaObject) -> None:
self.pylistener.onQueryIdle(QueryIdleEvent(jevent))

def onQueryTerminated(self, jevent: JavaObject) -> None:
self.pylistener.onQueryTerminated(QueryTerminatedEvent(jevent))

Expand Down Expand Up @@ -200,6 +214,38 @@ def progress(self) -> "StreamingQueryProgress":
return self._progress


class QueryIdleEvent:
"""
Event representing that query is idle and waiting for new data to process.
.. versionadded:: 3.5.0
Notes
-----
This API is evolving.
"""

def __init__(self, jevent: JavaObject) -> None:
self._id: uuid.UUID = uuid.UUID(jevent.id().toString())
self._runId: uuid.UUID = uuid.UUID(jevent.runId().toString())

@property
def id(self) -> uuid.UUID:
"""
A unique query id that persists across restarts. See
py:meth:`~pyspark.sql.streaming.StreamingQuery.id`.
"""
return self._id

@property
def runId(self) -> uuid.UUID:
"""
A query id that is unique for every start/restart. See
py:meth:`~pyspark.sql.streaming.StreamingQuery.runId`.
"""
return self._runId


class QueryTerminatedEvent:
"""
Event representing that termination of a query.
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/streaming/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ def addListener(self, listener: StreamingQueryListener) -> None:
... def onQueryProgress(self, event):
... pass
...
... def onQueryIdle(self, event):
... pass
...
... def onQueryTerminated(self, event):
... pass
>>> test_listener = TestListener()
Expand Down Expand Up @@ -617,6 +620,9 @@ def removeListener(self, listener: StreamingQueryListener) -> None:
... def onQueryProgress(self, event):
... pass
...
... def onQueryIdle(self, event):
... pass
...
... def onQueryTerminated(self, event):
... pass
>>> test_listener = TestListener()
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/tests/streaming/test_streaming_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ def onQueryProgress(self, event):
nonlocal progress_event
progress_event = event

def onQueryIdle(self, event):
pass

def onQueryTerminated(self, event):
nonlocal terminated_event
terminated_event = event
Expand Down Expand Up @@ -281,6 +284,9 @@ def onQueryStarted(self, event):
def onQueryProgress(self, event):
pass

def onQueryIdle(self, event):
pass

def onQueryTerminated(self, event):
pass

Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,9 @@ def onQueryProgress(self, event):
nonlocal observed_metrics
observed_metrics = event.progress.observedMetrics

def onQueryIdle(self, event):
pass

def onQueryTerminated(self, event):
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2435,7 +2435,7 @@ object SQLConf {
val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
buildConf("spark.sql.streaming.noDataProgressEventInterval")
.internal()
.doc("How long to wait between two progress events when there is no data")
.doc("How long to wait before providing query idle event when there is no data")
.version("2.1.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(10000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ReportsS
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryProgressEvent}
import org.apache.spark.util.Clock

/**
Expand Down Expand Up @@ -89,7 +89,7 @@ trait ProgressReporter extends Logging {
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval

// The timestamp we report an event that has not executed anything
private var lastNoExecutionProgressEventTime = Long.MinValue
private var lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
Expand Down Expand Up @@ -153,6 +153,11 @@ trait ProgressReporter extends Logging {
logInfo(s"Streaming query made progress: $newProgress")
}

private def postIdleness(): Unit = {
postEvent(new QueryIdleEvent(id, runId))
logInfo("Streaming query has been idle and waiting for new data.")
}

/**
* Finalizes the query progress and adds it to list of recent status updates.
*
Expand All @@ -166,108 +171,96 @@ trait ProgressReporter extends Logging {
currentTriggerLatestOffsets != null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

val executionStats = extractExecutionStats(hasNewData, hasExecuted)
val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND
if (hasExecuted) {
val executionStats = extractExecutionStats(hasNewData)
val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND

val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
(currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
} else {
Double.PositiveInfinity
}
logDebug(s"Execution stats: $executionStats")
val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
(currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
} else {
Double.PositiveInfinity
}
logDebug(s"Execution stats: $executionStats")

val sourceProgress = sources.distinct.map { source =>
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
val sourceMetrics = source match {
case withMetrics: ReportsSourceMetrics =>
withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
case _ => Map[String, String]().asJava
}
new SourceProgress(
description = source.toString,
startOffset = currentTriggerStartOffsets.get(source).orNull,
endOffset = currentTriggerEndOffsets.get(source).orNull,
latestOffset = currentTriggerLatestOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec,
metrics = sourceMetrics
)
}

val sourceProgress = sources.distinct.map { source =>
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
val sourceMetrics = source match {
case withMetrics: ReportsSourceMetrics =>
withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
val sinkOutput = sinkCommitProgress.map(_.numOutputRows)
val sinkMetrics = sink match {
case withMetrics: ReportsSinkMetrics =>
withMetrics.metrics()
case _ => Map[String, String]().asJava
}
new SourceProgress(
description = source.toString,
startOffset = currentTriggerStartOffsets.get(source).orNull,
endOffset = currentTriggerEndOffsets.get(source).orNull,
latestOffset = currentTriggerLatestOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec,
metrics = sourceMetrics
)
}

val sinkOutput = if (hasExecuted) {
sinkCommitProgress.map(_.numOutputRows)
} else {
sinkCommitProgress.map(_ => 0L)
}

val sinkMetrics = sink match {
case withMetrics: ReportsSinkMetrics =>
withMetrics.metrics()
case _ => Map[String, String]().asJava
}

val sinkProgress = SinkProgress(
sink.toString, sinkOutput, sinkMetrics)

val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)

val newProgress = new StreamingQueryProgress(
id = id,
runId = runId,
name = name,
timestamp = formatTimestamp(currentTriggerStartTimestamp),
batchId = currentBatchId,
batchDuration = processingTimeMills,
durationMs =
new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava),
eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
sink = sinkProgress,
observedMetrics = new java.util.HashMap(observedMetrics.asJava))
val sinkProgress = SinkProgress(
sink.toString, sinkOutput, sinkMetrics)

val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)

val newProgress = new StreamingQueryProgress(
id = id,
runId = runId,
name = name,
timestamp = formatTimestamp(currentTriggerStartTimestamp),
batchId = currentBatchId,
batchDuration = processingTimeMills,
durationMs =
new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava),
eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
sink = sinkProgress,
observedMetrics = new java.util.HashMap(observedMetrics.asJava))

if (hasExecuted) {
// Reset noDataEventTimestamp if we processed any data
lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
lastNoExecutionProgressEventTime = now
updateProgress(newProgress)
postIdleness()
}
}

currentStatus = currentStatus.copy(isTriggerActive = false)
}

/** Extract statistics about stateful operators from the executed query plan. */
private def extractStateOperatorMetrics(hasExecuted: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
// lastExecution could belong to one of the previous triggers if `!hasExecuted`.
// Walking the plan again should be inexpensive.
private def extractStateOperatorMetrics(): Seq[StateOperatorProgress] = {
assert(lastExecution != null, "lastExecution is not available")
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreWriter] =>
val progress = p.asInstanceOf[StateStoreWriter].getProgress()
if (hasExecuted) {
progress
} else {
progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0)
}
p.asInstanceOf[StateStoreWriter].getProgress()
}
}

/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean, hasExecuted: Boolean): ExecutionStats = {
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
val watermarkTimestamp =
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]

// SPARK-19378: Still report metrics even though no data was processed while reporting progress.
val stateOperators = extractStateOperatorMetrics(hasExecuted)
val stateOperators = extractStateOperatorMetrics()

if (!hasNewData) {
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
if (shouldReport(queryProgress.progress.runId)) {
listener.onQueryProgress(queryProgress)
}
case queryIdle: QueryIdleEvent =>
if (shouldReport(queryIdle.runId)) {
listener.onQueryIdle(queryIdle)
}
case queryTerminated: QueryTerminatedEvent =>
if (shouldReport(queryTerminated.runId)) {
listener.onQueryTerminated(queryTerminated)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ abstract class StreamingQueryListener {
*/
def onQueryProgress(event: QueryProgressEvent): Unit

/**
* Called when the query is idle and waiting for new data to process.
* @since 3.5.0
*/
def onQueryIdle(event: QueryIdleEvent): Unit = {}

/**
* Called when a query is stopped, with or without error.
* @since 2.0.0
Expand All @@ -72,6 +78,8 @@ private[spark] trait PythonStreamingQueryListener {

def onQueryProgress(event: QueryProgressEvent): Unit

def onQueryIdle(event: QueryIdleEvent): Unit

def onQueryTerminated(event: QueryTerminatedEvent): Unit
}

Expand All @@ -83,6 +91,8 @@ private[spark] class PythonStreamingQueryListenerWrapper(

def onQueryProgress(event: QueryProgressEvent): Unit = listener.onQueryProgress(event)

override def onQueryIdle(event: QueryIdleEvent): Unit = listener.onQueryIdle(event)

def onQueryTerminated(event: QueryTerminatedEvent): Unit = listener.onQueryTerminated(event)
}

Expand Down Expand Up @@ -123,6 +133,15 @@ object StreamingQueryListener {
@Evolving
class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event

/**
* Event representing that query is idle and waiting for new data to process.
* @since 3.5.0
*/
@Evolving
class QueryIdleEvent private[sql](
val id: UUID,
val runId: UUID) extends Event

/**
* Event representing that termination of a query.
*
Expand Down
Loading

0 comments on commit 2178769

Please sign in to comment.