ConcurrentSkipListMap如何使删除和添加调用原子

Ser*_*gey 6 java multithreading java.util.concurrent java-8

我有N个线程添加值和一个删除线程.我正在考虑如何同步添加到现有值列表和删除列表的最佳方法.

我想以下案例是可能的:

 thread 1 checked condition containsKey, and entered in else block
 thread 2 removed the value
 thread 1 try to add value to existing list, and get returns null
Run Code Online (Sandbox Code Playgroud)

我认为我可以使用的唯一方法是通过地图值进行同步,在我们的情况下是添加时和删除时的List

    private ConcurrentSkipListMap<LocalDateTime, List<Task>> tasks = new ConcurrentSkipListMap<>();

    //Thread1,3...N
    public void add(LocalDateTime time, Task task) {
        if (!tasks.containsKey(time)) {
            tasks.computeIfAbsent(time, k -> createValue(task));
        } else {
             //potentially should be synced
            tasks.get(time).add(task);
        }
    }
    private List<Task> createValue(Task val) {
        return new ArrayList<>(Arrays.asList(val));
    }

    //thread 2
   public void remove()
    while(true){
        Map.Entry<LocalDateTime, List<Task>> keyVal = tasks.firstEntry();
        if (isSomeCondition(keyVal)) {
            tasks.remove(keyVal.getKey());
            for (Task t : keyVal.getValue()) {
                //do task processing
            }
        }
    }
   }
Run Code Online (Sandbox Code Playgroud)

Hol*_*ger 2

它\xe2\x80\x99s 并不完全清楚你的remove()方法应该做什么。在目前的形式中,它\xe2\x80\x99是一个无限循环,首先,它将迭代头元素并删除它们,直到头元素不满足条件,然后,它将重复轮询该头元素并重新评估病情。除非它设法删除所有元素,在这种情况下它将抛出异常。

\n\n

如果你想处理map中当前的所有元素,你可以简单地循环它,弱一致性迭代器允许你在修改它的同时继续进行;您可能会注意到或没有注意到正在进行的并发更新。

\n\n

如果只想处理匹配的头元素,则必须插入一个条件,返回到调用者或将线程置于睡眠状态(或者更好地添加通知机制),以避免因重复失败的测试而烧毁CPU(甚至在地图为空时抛出)。

\n\n

ConcurrentSkipListMap除此之外,您可以在确保功能之间不存在干扰的情况下执行操作。假设remove应该处理所有当前元素一次,实现可能看起来像

\n\n
public void add(LocalDateTime time, Task task) {\n    tasks.merge(time, Collections.singletonList(task),\n        (l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList()));\n}\n\npublic void remove() {\n    for(Map.Entry<LocalDateTime, List<Task>> keyVal : tasks.entrySet()) {\n        final List<Task> values = keyVal.getValue();\n        if(isSomeCondition(keyVal) && tasks.remove(keyVal.getKey(), values)) {\n            for (Task t : values) {\n                //do task processing\n            }\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

关键点是地图中包含的列表永远不会被修改。merge(time, Collections.singletonList(task), \xe2\x80\xa6如果没有先前的映射,该操作甚至会存储单个任务的不可变列表。如果存在以前的任务,合并功能(l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList())将创建一个新列表,而不是修改现有列表。当列表变得很大时,这可能会对性能产生影响,尤其是在争用的情况下必须重复操作时,但这\xe2\x80\x99是不需要锁定或额外同步的代价。

\n\n

remove操作使用的remove(key, value)方法只有在 map\xe2\x80\x99s 值仍然与预期值匹配时才会成功。这依赖于这样一个事实:我们的方法都不会修改映射中包含的列表,而是在合并时用新的列表实例替换它们。如果remove(key, value)成功,则可以处理该列表;此时,它已不再包含在地图中。请注意,在 的评估过程中isSomeCondition(keyVal),列表仍然包含在地图中,因此,isSomeCondition(keyVal) 不得修改它,但我认为无论如何测试方法都应该是这种情况isSomeCondition。当然,评估其中的列表isSomeCondition也依赖于其他不修改列表的方法。

\n