Tra*_*own 10 concurrency scala map thread-safety
假设我想在Scala中使用可变映射来跟踪我看到一些字符串的次数.在单线程环境中,这很容易:
import scala.collection.mutable.{ Map => MMap }
class Counter {
val counts = MMap.empty[String, Int].withDefaultValue(0)
def add(s: String): Unit = counts(s) += 1
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,这不是线程安全的,因为get并且update不会原子地发生.
并发映射为可变映射API 添加了一些原子操作,但不是我需要的那个,它看起来像这样:
def replace(k: A, f: B => B): Option[B]
Run Code Online (Sandbox Code Playgroud)
import scala.concurrent.stm._
class Counter {
val counts = TMap.empty[String, Int]
def add(s: String): Unit = atomic { implicit txn =>
counts(s) = counts.get(s).getOrElse(0) + 1
}
}
Run Code Online (Sandbox Code Playgroud)
但是(现在)这仍然是一个额外的依赖.其他选项包括actor(另一个依赖项),同步(可能效率较低)或Java的原子引用(较少惯用).
一般来说我会避免Scala中的可变地图,但我偶尔会需要这种东西,而且最近我使用了STM方法(而不是只是交叉我的手指,希望我不被天真的咬伤解).
我知道这里有许多权衡(额外依赖性与性能与清晰度等),但在Scala 2.10中是否存在类似"正确"解决此问题的答案?
Ion*_*tan 10
这个怎么样?假设你现在不需要一般replace方法,只需一个计数器.
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
object CountedMap {
private val counts = new ConcurrentHashMap[String, AtomicInteger]
def add(key: String): Int = {
val zero = new AtomicInteger(0)
val value = Option(counts.putIfAbsent(key, zero)).getOrElse(zero)
value.incrementAndGet
}
}
Run Code Online (Sandbox Code Playgroud)
与在整个地图上同步相比,您获得了更好的性能,并且还获得了原子增量.
最简单的解决方案肯定是同步。如果没有太多争用,性能可能不会那么差。
\n\n否则,您可以尝试构建自己的类似 STM 的replace实现。像这样的事情可能会做:
object ConcurrentMapOps {\n private val rng = new util.Random\n private val MaxReplaceRetryCount = 10\n private val MinReplaceBackoffTime: Long = 1\n private val MaxReplaceBackoffTime: Long = 20\n}\nimplicit class ConcurrentMapOps[A, B]( val m: collection.concurrent.Map[A,B] ) {\n import ConcurrentMapOps._\n private def replaceBackoff() {\n Thread.sleep( (MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime) ).toLong ) // A bit crude, I know\n }\n\n def replace(k: A, f: B => B): Option[B] = {\n m.get( k ) match {\n case None => return None\n case Some( old ) =>\n var retryCount = 0\n while ( retryCount <= MaxReplaceRetryCount ) {\n val done = m.replace( k, old, f( old ) )\n if ( done ) {\n return Some( old )\n }\n else { \n retryCount += 1\n replaceBackoff()\n }\n }\n sys.error("Could not concurrently modify map")\n }\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n请注意,冲突问题仅限于给定的键。如果两个线程访问同一个映射但处理不同的键,则不会发生冲突,并且替换操作总是会在第一次成功。如果检测到冲突,我们会等待一段时间(随机的时间,以便最大限度地减少线程永远争夺同一密钥的可能性),然后重试。
\n\n我不能保证这是生产就绪的(我现在刚刚把它扔掉了),但这可能会起作用。
\n\n更新:当然(正如 Ionu\xc8\x9b G. Stan 指出的那样),如果你想要的只是递增/递减一个值,javaConcurrentHashMap已经以无锁的方式提供了这些操作。\n如果您需要更通用的解决方案,则适用我的上述解决方案replace方法,将转换函数作为参数,则适用我的上述解决方案。
| 归档时间: |
|
| 查看次数: |
3263 次 |
| 最近记录: |