Ami*_*mit 8 java concurrency multithreading
我有一个场景,我必须维护一个可以由多个线程填充的Map,每个线程修改它们各自的List(唯一标识符/键是线程名称),当线程的列表大小超过固定批量大小时,我们必须保持DB中的记录.
示例代码如下:聚合器类
private volatile ConcurrentHashMap<String, List<T>> instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReentrantLock lock ;
public void addAll(List<T> entityList, String threadName) {
try {
lock.lock();
List<T> instrumentList = instrumentMap.get(threadName);
if(instrumentList == null) {
instrumentList = new ArrayList<T>(batchSize);
instrumentMap.put(threadName, instrumentList);
}
if(instrumentList.size() >= batchSize -1){
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
} finally {
lock.unlock();
}
}
Run Code Online (Sandbox Code Playgroud)
每隔2分钟再运行一个单独的线程(使用相同的锁定)以持久保存Map中的所有记录(以确保每2分钟后持续存在一些内容并且地图大小不会太大)
if(//Some condition) {
Thread.sleep(//2 minutes);
aggregator.getLock().lock();
List<T> instrumentList = instrumentMap.values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
if(instrumentList.size() > 0) {
saver.persist(instrumentList);
instrumentMap .values().parallelStream().forEach(x -> x.clear());
aggregator.getLock().unlock();
}
}
Run Code Online (Sandbox Code Playgroud)
这个解决方案几乎适用于我们测试的每个场景,除非有时候我们看到一些记录丢失了,尽管它们在Map中添加得很好但根本没有保留
@Slaw 在评论中提供的答案达到了目的。我们让InstrumentList实例以非同步方式转义,即访问/操作在列表上发生,没有任何同步。通过将副本传递给进一步的方法来修复相同的问题就达到了目的。
以下代码行是发生此问题的代码行
recordSaver.persist(instrumentList); 仪器列表.clear();
在这里,我们允许instrumentList实例以非同步方式转义,即它被传递到另一个类(recordSaver.persist),在那里它要被操作,但我们也在下一行(在Aggregator类中)清除列表,并且所有这一切都是以非同步方式发生的。列表状态无法在记录保存程序中预测...这是一个非常愚蠢的错误。
我们通过将InstrumentList的克隆副本传递给 recordSaver.persist(...) 方法解决了该问题。这样,instrumentList.clear()不会影响 recordSaver 中可供进一步操作的列表。