宇宙人*_*宇宙人 10 scala akka apache-spark
在我的spark应用程序中,有一个object ResourceFactory包含ActorSystem用于提供资源客户端的akka .所以当我运行这个spark应用程序时,每个工作节点都会创建一个ActorSystem.问题是当spark应用程序完成其工作并关闭时.在ActorSystem仍保持活着每一个工作节点上,并阻止整个应用程序终止,它只是挂在.
有没有办法注册一些监听器,SparkContext以便sc在关闭时,ActorSystem每个工作节点上的每个工作节点都会收到通知自己关闭?
更新:
以下是简化的骨架:
有一个ResourceFactory,它是一个object,它包含一个actor system.它还提供了一种fetchData方法.
object ResourceFactory{
val actorSystem = ActorSystem("resource-akka-system")
def fetchData(): SomeData = ...
}
Run Code Online (Sandbox Code Playgroud)
然后,有一个user-defined RDD类,在它的compute方法中,它需要从中获取数据ResourceFactory.
class MyRDD extends RDD[SomeClass] {
override def compute(...) {
...
ResourceFactory.fetchData()
...
someIterator
}
}
Run Code Online (Sandbox Code Playgroud)
因此,在每个节点上都会有一个ActorSystem名为"resource-akka-system"的MyRDD节点,分布在这些工作节点上的那些实例可以从"resource-akka-system"获取数据.
问题是,当SparkContext关闭时,不需要那些"resource-akka-system",但我不知道如何在ResourceFactory关闭时通知关闭"resource-akka-system" SparkContext.所以现在,"resouce-akka-system"在每个工作节点上都保持活动状态,并阻止整个程序退出.
UPDATE2:
通过一些实验,我发现在本地模式下程序挂起,但在yarn-cluster模式下,程序将成功退出.可能是因为yarn当sc关闭时会杀死工作节点上的线程吗?
UPDATE3:
要检查每个节点是否包含一个ActorSystem,我将代码更改如下(以下是真正的骨架,因为我添加了另一个类定义):
object ResourceFactory{
println("creating resource factory")
val actorSystem = ActorSystem("resource-akka-system")
def fetchData(): SomeData = ...
}
class MyRDD extends RDD[SomeClass] {
println("creating my rdd")
override def compute(...) {
new RDDIterator(...)
}
}
class RDDIterator(...) extends Iterator[SomeClass] {
println("creating rdd iterator")
...
lazy val reader = {
...
ResourceFactory.fetchData()
...
}
...
override next() = {
...
reader.xx()
}
}
Run Code Online (Sandbox Code Playgroud)
添加这些后println,我在yarn-cluster模式下运行spark代码.我发现在驱动程序上我有以下打印:
creating my rdd
creating resource factory
creating my rdd
...
Run Code Online (Sandbox Code Playgroud)
在一些工人身上,我有以下印刷品:
creating rdd iterator
creating resource factory
Run Code Online (Sandbox Code Playgroud)
而一些工人,它什么都不打印(并且所有工作都没有分配任何任务).
基于上述,我觉得object在驱动程序初始化急切,因为它打印creating resource factory的驱动程序,即使没有事情是指它,object因为它打印懒洋洋地在工人正在初始化creating resource factory打印后creating rdd iterator作为资源工厂懒洋洋地通过创建的第一个RDDIterator引用.
我发现在我的用例中,MyRDD类只在驱动程序中创建.
我不太确定objecton驱动程序和worker 的初始化的懒惰,这是我的猜测,因为可能是程序的其他部分导致它看起来像那样.但我认为在必要时每个工作节点上都有一个actor系统应该是正确的.
我不认为有一种方法可以利用每一个Worker生命周期。
我对您的实施还有一些疑问:
如果您有objectcontains val,它是在工作程序上运行的函数中使用的,我的理解是它val会被序列化并广播给工作程序。您能否确认每个工作人员都运行一个 ActorSystem?
如果您没有显式等待 Actor 系统终止,那么它通常会立即终止。你是在打电话system.awaitTermination还是在阻止system.whenTerminated?
无论如何,还有另一种方法可以关闭远程工作人员上的 Actor 系统:
sc广播给每个工作人员。简单来说,只要有val那个地址就可以了。