线程安全地转换可变映射中的值

Tra*_*own 10 concurrency scala map thread-safety

假设我想在Scala中使用可变映射来跟踪我看到一些字符串的次数.在单线程环境中,这很容易:

import scala.collection.mutable.{ Map => MMap }

class Counter {
  val counts = MMap.empty[String, Int].withDefaultValue(0)

  def add(s: String): Unit = counts(s) += 1
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,这不是线程安全的,因为get并且update不会原子地发生.

并发映射为可变映射API 添加了一些原子操作,但不是我需要的那个,它看起来像这样:

def replace(k: A, f: B => B): Option[B]
Run Code Online (Sandbox Code Playgroud)

我知道我可以使用ScalaSTMTMap:

import scala.concurrent.stm._

class Counter {
  val counts =  TMap.empty[String, Int]

  def add(s: String): Unit = atomic { implicit txn =>
    counts(s) = counts.get(s).getOrElse(0) + 1
  }
}
Run Code Online (Sandbox Code Playgroud)

但是(现在)这仍然是一个额外的依赖.其他选项包括actor(另一个依赖项),同步(可能效率较低)或Java的原子引用(较少惯用).

一般来说我会避免Scala中的可变地图,但我偶尔会需要这种东西,而且最近我使用了STM方法(而不是只是交叉我的手指,希望我不被天真的咬伤解).

我知道这里有许多权衡(额外依赖性与性能与清晰度等),但在Scala 2.10中是否存在类似"正确"解决此问题的答案?

Ion*_*tan 10

这个怎么样?假设你现在不需要一般replace方法,只需一个计数器.

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

object CountedMap {
  private val counts = new ConcurrentHashMap[String, AtomicInteger]

  def add(key: String): Int = {
    val zero = new AtomicInteger(0)
    val value = Option(counts.putIfAbsent(key, zero)).getOrElse(zero)
    value.incrementAndGet
  }
}
Run Code Online (Sandbox Code Playgroud)

与在整个地图上同步相比,您获得了更好的性能,并且还获得了原子增量.


Rég*_*les 3

最简单的解决方案肯定是同步。如果没有太多争用,性能可能不会那么差。

\n\n

否则,您可以尝试构建自己的类似 STM 的replace实现。像这样的事情可能会做:

\n\n
object ConcurrentMapOps {\n  private val rng = new util.Random\n  private val MaxReplaceRetryCount = 10\n  private val MinReplaceBackoffTime: Long = 1\n  private val MaxReplaceBackoffTime: Long = 20\n}\nimplicit class ConcurrentMapOps[A, B]( val m: collection.concurrent.Map[A,B] ) {\n  import ConcurrentMapOps._\n  private def replaceBackoff() {\n    Thread.sleep( (MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime) ).toLong ) // A bit crude, I know\n  }\n\n  def replace(k: A, f: B => B): Option[B] = {\n    m.get( k ) match {\n      case None => return None\n      case Some( old ) =>\n        var retryCount = 0\n        while ( retryCount <= MaxReplaceRetryCount ) {\n          val done = m.replace( k, old, f( old ) )\n          if ( done ) {\n            return Some( old )\n          }\n          else {         \n            retryCount += 1\n            replaceBackoff()\n          }\n        }\n        sys.error("Could not concurrently modify map")\n    }\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

请注意,冲突问题仅限于给定的键。如果两个线程访问同一个映射但处理不同的键,则不会发生冲突,并且替换操作总是会在第一次成功。如果检测到冲突,我们会等待一段时间(随机的时间,以便最大限度地减少线程永远争夺同一密钥的可能性),然后重试。

\n\n

我不能保证这是生产就绪的(我现在刚刚把它扔掉了),但这可能会起作用。

\n\n

更新:当然(正如 Ionu\xc8\x9b G. Stan 指出的那样),如果你想要的只是递增/递减一个值,javaConcurrentHashMap已经以无锁的方式提供了这些操作。\n如果您需要更通用的解决方案,则适用我的上述解决方案replace方法,将转换函数作为参数,则适用我的上述解决方案。

\n