shi*_*455 9 apache-spark spark-structured-streaming
我正在使用 Spark 2.1 并尝试优雅地停止流式查询。
是StreamingQuery.stop()一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息:
void stop()如果该查询正在运行,则停止执行该查询。此方法会阻塞,直到执行执行的线程停止。自:2.0.0
而在过去的流媒体世界 (DStreams) 中,有一个选项可以停止流的执行,并可以选择确保所有接收到的数据都已被处理:
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit停止流的执行,可选择确保所有接收到的数据都已处理。
stopSparkContext如果为 true,则停止关联的 SparkContext。无论此 StreamingContext 是否已启动,底层 SparkContext 都将停止。
stopGracefully如果为 true,则通过等待所有接收到的数据的处理完成来优雅地停止
所以问题是如何优雅地停止结构化流查询?
对于 PySpark 用户,这是 @ASe 答案的 Python 端口
# Helper method to stop a streaming query
def stop_stream_query(query, wait_time):
"""Stop a running streaming query"""
while query.isActive:
msg = query.status['message']
data_avail = query.status['isDataAvailable']
trigger_active = query.status['isTriggerActive']
if not data_avail and not trigger_active and msg != "Initializing sources":
print('Stopping query...')
query.stop()
time.sleep(0.5)
# Okay wait for the stop to happen
print('Awaiting termination...')
query.awaitTermination(wait_time)
Run Code Online (Sandbox Code Playgroud)
如果“优雅地”是指流式查询应该完成数据处理,void stop()则不会这样做。它只会等到执行执行的线程停止(如文档中所述)。这并不意味着它将完成处理。
为此,我们需要让查询等待,直到查询的当前触发器完成。我们可以通过 来检查StreamingQueryStatus,像这样:
while (query.status.isTriggerActive) {//什么都不做}
它将等到查询完成处理。然后我们可以调用query.stop().
我希望它有帮助!
如果没有更多的记录可供消费,这样的代码可以帮助停止微批处理流
def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
while (query.isActive) {
val msg = query.status.message
if (!query.status.isDataAvailable
&& !query.status.isTriggerActive
&& !msg.equals("Initializing sources")) {
query.stop()
}
query.awaitTermination(awaitTerminationTimeMs)
}
}
Run Code Online (Sandbox Code Playgroud)
这取决于“优雅地”是什么意思:)
StreamingQuery 仅停止特定查询。它会等待,直到 MicroBatch 线程停止并准备好关闭源。这个“等待”意味着数据将被处理,然后线程将停止
| 归档时间: |
|
| 查看次数: |
5438 次 |
| 最近记录: |