并发集队列

Ste*_*rla 33 java queue collections concurrency set

也许这是一个愚蠢的问题,但我似乎找不到明显的答案.

我需要一个只包含唯一值的并发FIFO队列.尝试添加队列中已存在的值只会忽略该值.哪个,如果不是线程安全将是微不足道的.Java中是否存在数据结构,或者是否存在表现出此行为的互联网上的代码片段?

Kev*_*ion 6

如果你想要比完全同步更好的并发性,我有一种方法可以做到这一点,使用ConcurrentHashMap作为支持映射.以下仅为草图.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}
Run Code Online (Sandbox Code Playgroud)

用这种方法一切都不是阳光.除了使用支持贴图之外entrySet().iterator().next(),我们没有合适的方法来选择头部元素,结果是随着时间的推移,地图变得越来越不平衡.由于更大的桶冲突和更大的段争用,这种不平衡是一个问题.

注意:此代码在一些地方使用Guava.

  • 这如何保持`Queue`的顺序?迭代顺序取决于支持映射的实现. (3认同)

eri*_*son 5

没有内置的集合可以做到这一点.有一些并发Set实现可以与并发使用Queue.

例如,只有在将项成功添加到集合后才将项添加到队列中,并从集中删除从队列中删除的每个项.在这种情况下,队列的内容在逻辑上实际上是集合中的任何内容,并且队列仅用于跟踪顺序并提供仅在a上找到的有效take()poll()操作BlockingQueue.


Ben*_*nes 5

我会使用同步的 LinkedHashSet,直到有足够的理由来考虑替代方案。更并发的解决方案可以提供的主要好处是锁分裂。

最简单的并发方法是一个 ConcurrentHashMap(作为一个集合)和一个 ConcurrentLinkedQueue。操作的顺序将提供所需的约束。offer() 将首先执行 CHM#putIfAbsent(),如果成功插入到 CLQ。poll() 将从 CLQ 中获取,然后将其从 CHM 中删除。这意味着我们考虑队列中的条目,如果它在映射中并且 CLQ 提供排序。然后可以通过增加地图的 concurrencyLevel 来调整性能。如果你能容忍额外的 racy-ness,那么便宜的 CHM#get() 可以作为一个合理的先决条件(但它可能会因为有点陈旧的观点而受到影响)。