使用锁和ConcurrentHashMap缺少更新

Ami*_*mit 8 java concurrency multithreading

我有一个场景,我必须维护一个可以由多个线程填充的Map,每个线程修改它们各自的List(唯一标识符/键是线程名称),当线程的列表大小超过固定批量大小时,我们必须保持DB中的记录.

示例代码如下:聚合器类

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReentrantLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.unlock();
    }

}
Run Code Online (Sandbox Code Playgroud)

每隔2分钟再运行一个单独的线程(使用相同的锁定)以持久保存Map中的所有记录(以确保每2分钟后持续存在一些内容并且地图大小不会太大)

if(//Some condition) {
    Thread.sleep(//2 minutes);
    aggregator.getLock().lock();
    List<T> instrumentList = instrumentMap.values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
    if(instrumentList.size() > 0) {
        saver.persist(instrumentList);
        instrumentMap .values().parallelStream().forEach(x -> x.clear());
        aggregator.getLock().unlock();
    }
}
Run Code Online (Sandbox Code Playgroud)

这个解决方案几乎适用于我们测试的每个场景,除非有时候我们看到一些记录丢失了,尽管它们在Map中添加得很好但根本没有保留

  1. 我的问题是这段代码有什么问题?
  2. ConcurrentHashMap不是最好的解决方案吗?
  3. 与ConcurrentHashMap一起使用的List是否有问题?
  4. 我应该在这里使用ConcurrentHashMap的计算方法(不需要我认为ReentrantLock已经在做同样的工作)?

Ami*_*mit 2

@Slaw 在评论中提供的答案达到了目的。我们让InstrumentList实例以非同步方式转义,即访问/操作在列表上发生,没有任何同步。通过将副本传递给进一步的方法来修复相同的问题就达到了目的。

以下代码行是发生此问题的代码行

recordSaver.persist(instrumentList); 仪器列表.clear();

在这里,我们允许instrumentList实例以非同步方式转义,即它被传递到另一个类(recordSaver.persist),在那里它要被操作,但我们也在下一行(在Aggregator类中)清除列表,并且所有这一切都是以非同步方式发生的。列表状态无法在记录保存程序中预测...这是一个非常愚蠢的错误。

我们通过将InstrumentList的克隆副本传递给 recordSaver.persist(...) 方法解决了该问题。这样,instrumentList.clear()不会影响 recordSaver 中可供进一步操作的列表。