Maa*_*mon 8 apache-kafka apache-spark spark-streaming spark-structured-streaming
在我的场景中,我有几个数据集时不时出现,我需要在我们的平台中摄取它们。摄取过程涉及几个转换步骤。其中之一是 Spark。到目前为止,我特别使用火花结构化流媒体。基础设施还涉及 kafka,spark 结构化流从中读取数据。
我想知道是否有一种方法可以检测到某个主题在一段时间内没有其他可消费的东西来决定停止工作。那就是我想在消耗该特定数据集所需的时间内运行它,然后停止它。出于特定原因,我们决定不使用 spark 的批处理版本。
因此,是否有任何超时或可用于检测没有更多数据进入并且所有内容都已处理的东西。
谢谢
小智 1
你也许可以使用这个:-
def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
while (query.isActive) {
try{
if(query.lastProgress.numInputRows < 10){
query.awaitTermination(1000)
}
}
catch
{
case e:NullPointerException => println("First Batch")
}
Thread.sleep(500)
}
}
Run Code Online (Sandbox Code Playgroud)
您可以创建一个 numInputRows 变量。
归档时间: |
|
查看次数: |
3468 次 |
最近记录: |