ConcurrentHashMap 的 Kotlin 并发

Lat*_*Dev 8 java concurrency java.util.concurrent thread-synchronization kotlin

我正在尝试支持定期清除的哈希图上的并发性。我有一个缓存,可以存储一段时间的数据。每 5 分钟后,此缓存中的数据将发送到服务器。一旦我刷新,我想清除缓存。问题是当我刷新时,当我使用现有密钥执行此操作时,数据可能会写入此映射。我将如何使这个进程线程安全?

data class A(val a: AtomicLong, val b: AtomicLong) {
   fun changeA() {
      a.incrementAndGet()
   }
}

class Flusher {
   private val cache: Map<String, A> = ConcurrentHashMap()
   private val lock = Any()
   fun retrieveA(key: String){
       synchronized(lock) {
          return cache.getOrPut(key) { A(key, 1) }
       }
   }
 
   fun flush() {
      synchronized(lock) {
           // send data to network request
           cache.clear()
      }
   }
}

// Existence of multiple classes like CacheChanger
class CacheChanger{
  fun incrementData(){
      flusher.retrieveA("x").changeA()
  }
}
Run Code Online (Sandbox Code Playgroud)

我担心上面的缓存没有正确同步。有没有更好/正确的方法来锁定这个缓存,这样我就不会丢失数据?我应该创建缓存的深层副本并清除它吗?

既然上面的数据可能被另一个更改器更改,那会不会导致问题?

cia*_*mej 3

你可以摆脱锁。

在flush方法中,不是读取整个map(例如通过迭代器)然后清除它,而是一一删除每个元素。

我不确定您是否可以使用迭代器的删除方法(我稍后会检查),但是您可以对键集进行迭代,并为每个键调用cache.remove() - 这将为您提供值存储并自动从缓存中删除它。

棘手的部分是如何确保 A 类的对象在通过网络发送之前不会被修改......您可以按如下方式执行:

当您获取某些x对象retrieveA并修改该对象时,您需要确保它仍在缓存中。只需再调用一次检索即可。如果你得到完全相同的对象那就没问题了。如果不同,则意味着对象已被删除并通过网络发送,但您不知道修改是否也已发送,或者修改之前对象的状态已发送。不过,我认为在您的情况下,您可以简单地重复整个过程(应用更改并检查对象是否相同)。但这取决于您的应用程序的具体情况。

如果您不想增加两次,那么当通过网络发送数据时,您必须读取 counter 的内容a,将其存储在某个局部变量中并减少a该数量(通常会变为零)。然后在 中CacheChanger,当您从第二次检索中获得不同的对象时,您可以检查该值是否为零(考虑了您的修改),或者非零(这意味着您的修改只晚了几分之一秒),并且你必须重复这个过程。

您也可以替换incrementAndGetcompareAndSwap,但这可能会产生稍差的性能。在这种方法中,您尝试交换一个大一的值,而不是递增。在通过网络发送之前,您尝试将值交换为 -1 以表示该值无效。如果第二次交换失败,则意味着有人同时更改了该值,您需要再检查一次,以便通过网络发送最新的值,并且您在循环中重复该过程(仅当交换到-1 成功)。在交换到大一的情况下,您还可以循环重复该过程,直到交换成功。如果失败,则意味着其他人交换到某个更大的值,或者交换Flusher到-1。在后一种情况下,您知道必须retrieveA再次调用才能获取新对象。