如何在火花停止时清理其他资源

宇宙人*_*宇宙人 10 scala akka apache-spark

在我的spark应用程序中,有一个object ResourceFactory包含ActorSystem用于提供资源客户端的akka .所以当我运行这个spark应用程序时,每个工作节点都会创建一个ActorSystem.问题是当spark应用程序完成其工作并关闭时.在ActorSystem仍保持活着每一个工作节点上,并阻止整个应用程序终止,它只是挂在.

有没有办法注册一些监听器,SparkContext以便sc在关闭时,ActorSystem每个工作节点上的每个工作节点都会收到通知自己关闭?


更新:

以下是简化的骨架:

有一个ResourceFactory,它是一个object,它包含一个actor system.它还提供了一种fetchData方法.

object ResourceFactory{
  val actorSystem = ActorSystem("resource-akka-system")
  def fetchData(): SomeData = ...
}
Run Code Online (Sandbox Code Playgroud)

然后,有一个user-defined RDD类,在它的compute方法中,它需要从中获取数据ResourceFactory.

class MyRDD extends RDD[SomeClass] {
  override def compute(...) {
    ...
    ResourceFactory.fetchData()
    ...
    someIterator
  }
}
Run Code Online (Sandbox Code Playgroud)

因此,在每个节点上都会有一个ActorSystem名为"resource-akka-system"的MyRDD节点,分布在这些工作节点上的那些实例可以从"resource-akka-system"获取数据.

问题是,当SparkContext关闭时,不需要那些"resource-akka-system",但我不知道如何在ResourceFactory关闭时通知关闭"resource-akka-system" SparkContext.所以现在,"resouce-akka-system"在每个工作节点上都保持活动状态,并阻止整个程序退出.


UPDATE2:

通过一些实验,我发现在本地模式下程序挂起,但在yarn-cluster模式下,程序将成功退出.可能是因为yarnsc关闭时会杀死工作节点上的线程吗?


UPDATE3:

要检查每个节点是否包含一个ActorSystem,我将代码更改如下(以下是真正的骨架,因为我添加了另一个类定义):

object ResourceFactory{
  println("creating resource factory")
  val actorSystem = ActorSystem("resource-akka-system")
  def fetchData(): SomeData = ...
}

class MyRDD extends RDD[SomeClass] {
  println("creating my rdd")
  override def compute(...) {
    new RDDIterator(...)
  }
}

class RDDIterator(...) extends Iterator[SomeClass] {
  println("creating rdd iterator")
  ...
  lazy val reader = {
    ...
    ResourceFactory.fetchData()
    ...
  }
  ...
  override next() = {
    ...
    reader.xx()
  }
}
Run Code Online (Sandbox Code Playgroud)

添加这些后println,我在yarn-cluster模式下运行spark代码.我发现在驱动程序上我有以下打印:

creating my rdd
creating resource factory
creating my rdd
...
Run Code Online (Sandbox Code Playgroud)

在一些工人身上,我有以下印刷品:

creating rdd iterator
creating resource factory
Run Code Online (Sandbox Code Playgroud)

而一些工人,它什么都不打印(并且所有工作都没有分配任何任务).

基于上述,我觉得object在驱动程序初始化急切,因为它打印creating resource factory的驱动程序,即使没有事情是指它,object因为它打印懒洋洋地在工人正在初始化creating resource factory打印后creating rdd iterator作为资源工厂懒洋洋地通过创建的第一个RDDIterator引用.

我发现在我的用例中,MyRDD类只在驱动程序中创建.

我不太确定objecton驱动程序和worker 的初始化的懒惰,这是我的猜测,因为可能是程序的其他部分导致它看起来像那样.但我认为在必要时每个工作节点上都有一个actor系统应该是正确的.

Aiv*_*ean 2

我不认为有一种方法可以利用每一个Worker生命周期。

我对您的实施还有一些疑问:

  1. 如果您有objectcontains val,它是在工作程序上运行的函数中使用的,我的理解是它val会被序列化并广播给工作程序。您能否确认每个工作人员都运行一个 ActorSystem?

  2. 如果您没有显式等待 Actor 系统终止,那么它通常会立即终止。你是在打电话system.awaitTermination还是在阻止system.whenTerminated



无论如何,还有另一种方法可以关闭远程工作人员上的 Actor 系统:

  1. 让你的 ActorSystem 在 akka 集群的每个节点上都成为一部分。以下是一些如何以编程方式执行此操作的文档。
  2. 将驱动程序节点(您所在的位置)上的“协调”Actor 的地址sc广播给每个工作人员。简单来说,只要有val那个地址就可以了。
  3. 当你的 akka 系统在每个工作线程上启动时,使用该“协调”Actor 地址来注册这个特定的 Actor 系统(向协调 Actor 发送相应的消息)。
  4. 协调 Actor 跟踪所有注册的“工作”Actor
  5. 当你的计算完成并且你想要关闭每个worker上的Akka系统时,从驱动节点上的协调Actor向所有注册的Actor发送消息。
  6. 收到“shutdown”消息时关闭工作 Akka 系统。