带阻塞和刷新的非阻塞并发队列

Mar*_*sco 18 java concurrency multithreading

带阻塞和刷新的非阻塞并发队列

我需要一个无限制的非阻塞并发队列,基本上只有2个操作:

  • offer:以原子方式将指定的项插入此队列的尾部;
  • flush:获取队列中当时存在的所有项目,并按照插入顺序逐个处理它们.更具体地说,必须是原子的只是这个"takeAll"操作,这将是冲洗的第一次操作.在takeAll之后提供给队列的所有项目将被插入,然后仅由另一个后续刷新处理.

目标是消费者在takeAll上有一个CAS操作,然后可以迭代列表中的元素,而无需每次读取CAS操作.此外,我们已经拥有Node(Entry),因为这需要存储其他一些不可变状态.新节点可以将HEAD作为构造函数参数,创建单向链接列表.

文献中是否存在具有这些特征的队列?

jta*_*orn 13

干得好:

public class FunkyQueue<T> {
    private final AtomicReference<Node<T>> _tail = new AtomicReference<Node<T>>();

    public void offer(T t) {
        while(true) {
            Node<T> tail = _tail.get();
            Node<T> newTail = new Node<T>(t, tail);
            if(_tail.compareAndSet(tail, newTail)) {
                break;
            }
        }
    }

    public List<T> takeAll() {
        Node<T> tail = _tail.getAndSet(null);

        LinkedList<T> list = new LinkedList<T>();
        while(tail != null) {
            list.addFirst(tail.get());
            tail = tail.getPrevious();
        }

        return list;
    }

    private static final class Node<T>
    {
        private final T _obj;
        private Node<T> _prev;

        private Node(T obj, Node<T> prev) {
            _obj = obj;
            _prev = prev;            
        }

        public T get() {
            return _obj;
        }

        public Node<T> getPrevious() {
            return _prev;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • @MarioFusco - 在这种情况下,你对"非阻塞"版本的渴望可能没有意义,你应该使用某种形式的锁定(正如我在之前的评论中也提到过的). (3认同)

Ale*_*you 5

鉴于:良好的实现,需要一个CAS offer()takeAll().

问题:takeAll()执行时间很长,因为它需要在相反方向上完全遍历单链表.

解决方案:在节点上创建其他跳过级别.对于所提到的数字(N~100K),两个级别就足够了,从而将步数减少takeAll()到~150.

基于上述实现,Node类:

public static final class Node<T> {

    private final T value;
    private Node<T> prev, prevL1, prevL2;
    private Node<T> next, nextL1, nextL2;

    private Node(T obj, Node<T> prev, long c) {
        value = obj;
        this.prev = prev;  
        // level 1 to skip 64 nodes, level 2 to skip 64^2 nodes
        // c is a value from some global addition counter, that
        // is not required to be atomic with `offer()`
        prevL1 = (c & (64 - 1) == 0) ? prev : prev.prevL1;
        prevL2 = (c & (64 * 64 - 1) == 0) ? prev : prev.prevL2;
    }

    public T get() {
        return value;
    }

    public Node<T> findHead() {
        // see below
    }

    public Node<T> next() {
        // see below
    }
}
Run Code Online (Sandbox Code Playgroud)

FunkyQueue#offer() 方法:

public void offer(T t) {
    long c = counter.incrementAndGet();  
    for(;;) {
        Node<T> oldTail = tail.get();
        Node<T> newTail = new Node<T>(t, oldTail, c);
        if (tail.compareAndSet(oldTail, newTail)) 
            break;
    }
}
Run Code Online (Sandbox Code Playgroud)

FunkyQueue#takeAll() 现在将返回列表的头部:

public Node<T> takeAll() {
    return tail.getAndSet(null).findHead();
}
Run Code Online (Sandbox Code Playgroud)

它调用Node#findHead(),现在可以使用跳过级别来加速后向遍历:

private Node<T> findHead() {

     Node<T> n = this;
     while (n.prevL2 != null) {  // <- traverse back on L2, assigning `next` nodes
         n.prevL2.nextL2 = n;
         n = n.prevL2; 
     }
     while (n.prevL1 != null) {  // <- the same for L1
         n.prevL1.nextL1 = n;
         n = n.prev1;
     }
     while (n.prev != null) {    // <- the same for L0
         n.prev.next = n;
         n = n.prev;
     }
     return n;
}
Run Code Online (Sandbox Code Playgroud)

最后,Node#next():

public Node<T> next() {

    if (this.next == null && this.nextL1 == null && this.nextL2 == null)       
        throw new IllegalStateException("No such element");

    Node<T> n;
    if (this.next == null) {         // L0 is not traversed yet
        if (this.nextL1 == null) {   // the same for L1
            n = this.nextL2;         // step forward on L2
            while (n != this) {      // traverse on L1
                n.prevL1.nextL1 = n;
                n = n.prevL1;
            }
        }  
        n = this.nextL1;             // step forward on L1
        while (n != this) {          // traverse on L0
            n.prev.next = n;
            n = n.prev;
        }
    }
    return this.next;
}
Run Code Online (Sandbox Code Playgroud)

我想主要想法很明确.应用一些重构,可以制作Node#findHead()并因此FunkyQueue#takeAll()在O(log N)和Node#next()O(1)中平均运行.


PS如果注意到一些错误或错误的语法,请编辑.