Nav*_*tha 5 apache-spark spark-structured-streaming
我建立了几个可在EMR上运行的Spark结构化流查询,它们是长时间运行的查询,并且需要始终运行,因为它们都是ETL类型的查询,当我向EMR上的YARN群集提交作业时,我可以提交一个单火花应用。因此,该spark应用程序应具有多个流查询。
我对如何以编程方式在同一提交中构建/启动多个流查询感到困惑。
例如:我有以下代码:
case class SparkJobs(prop: Properties) extends Serializable {
def run() = {
Type1SparkJobBuilder(prop).build().awaitTermination()
Type1SparkJobBuilder(prop).build().awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
我在我的主课上用 SparkJobs(new Properties()).run()
当我在Spark历史记录服务器中看到时,只有第一个Spark Streaming作业(Type1SparkJob)正在运行。
建议以编程方式在同一spark内引发多个流查询的方法是什么,我也找不到合适的文档。
由于您正在调用awaitTermination第一个查询,因此在开始第二个查询之前,它将阻塞直到完成为止。因此,您想启动两个查询,然后使用StreamingQueryManager.awaitAnyTermination。
val query1 = df.writeStream.start()
val query2 = df.writeStream.start()
spark.streams.awaitAnyTermination()
Run Code Online (Sandbox Code Playgroud)
除上述内容外,默认情况下,Spark使用FIFO调度程序。这意味着第一个查询在执行时会获取集群中的所有资源。由于您尝试同时运行多个查询,因此应切换到FAIR调度程序
如果您有一些查询应该拥有比其他查询更多的资源,那么您还可以调整各个调度程序池。
| 归档时间: |
|
| 查看次数: |
1177 次 |
| 最近记录: |