Scala并行集合和可变状态

zig*_*tar 4 parallel-processing scala

我有一个函数,通过更新映射执行一步计算(底层是一个可变的HashMap).我想并行执行其中几个计算(每个"链"在自己的可变HashMap上工作).

我通过将HashMaps放入并行集合并使用map将函数应用于每个HashMap来实现此目的.

现在我在地图中遇到了丢失的条目.在调试时,一旦异常断点停止程序,映射就会包含该条目(然后通过丢弃某些堆栈帧层来重新开始计算).

当我使用顺序集合时,此行为已消失.那么有可能存在一些不正常行为(或错误),这是由同一个HashMap在不同的线程中处理的吗?

我没有发布代码示例,因为我不认为该行为是可重现的.据我所知,唯一可变数据包含在那些保持计算状态的HashMaps中.

根据要求,我的代码示例创建(reset)和修改(step)的并行映射.

class ParallelInferer[V <: DiscreteVariable, TInf <: InferenceAlgorithm[V]](val numChains: Int,
                                              val inferer: InferenceAlgorithm[V],
                                              val random: Random) {
  //tuples of inferer state and the according random number generator
  protected var states: ParSeq[(inferer.Repr,Random)] = _
  protected var steps = 0

  reset()

  def reset() {
    steps = 0

    val seed = random.nextInt()

    //todo why does parallelizing fail here (key not found on a map)
    states = (1 to numChains).par
      .map(n => new Random(seed + n))    //create the rngs
      .map(rng => (inferer.createInitialState(rng),rng))
  }

  def step() {
    //advance every chain by one
    states = states.map{case (repr, rng) => (inferer.computeStep(repr, rng),rng)}
    steps = steps + 1
  }
}
Run Code Online (Sandbox Code Playgroud)

代码说明

ParallelInferer班的目的是(也)为不可变的推论.因此,在发布的代码中不能直接看到可变性,但我认为它是显示的重要部分.

每个推理算法都有一个状态概念,这个状态是类型的InferenceAlgorithm#Repr- 在inferer.Repr作为states变量的一部分的用法中是明显的.引用者通过将Repr(和随机对象)映射到Repr具有其computeStep功能的新对象来工作.这可以在def step().现在,一些传入者使用可变对象HashMap作为其状态的一部分.他们的computeStep方法在变异之后返回它作为参数获得的相同地图.

  1. 我可以以某种方式解决这个问题吗?
  2. 我是否滥用并行集合并且应该将我的任务并行化?

编辑

我刚刚再次运行并行化版本,我认为它也会导致算法无法终止,尽管它在顺序运行时会发生.好吧,不是那么令人惊讶,不是吗?

有人可以猜测为什么会这样吗?

Dav*_*ith 5

是的,一点没错.默认情况下,Mutable HashMaps不是线程安全的,以这种方式使用它们会导致未定义的行为.缺少条目实际上是一种相当温和的表现形式.根据底层实现,还可能将HashMap数据结构破坏到程序进入无限循环的程度.

有很多方法可以解决这个问题,这将有不同的编码复杂性和性能权衡.最简单的方法是使用同步的哈希映射而不是非同步的哈希映射.

import scala.collection.mutable._

val map = new HashMap[Key, Value] with SynchronizedMap[Key, Value] 
Run Code Online (Sandbox Code Playgroud)

我不会说根本问题是你错误地使用并行集合,而是使用可变数据结构的任何并行程序都会出现这样的问题.更多Scala-ish方法是使用不可变映射,并让并行处理步骤返回新的不可变映射.由于不可变哈希映射的底层实现,这听起来很昂贵,但不一定如此.