Mar*_*sco 18 java concurrency multithreading
带阻塞和刷新的非阻塞并发队列
我需要一个无限制的非阻塞并发队列,基本上只有2个操作:
目标是消费者在takeAll上有一个CAS操作,然后可以迭代列表中的元素,而无需每次读取CAS操作.此外,我们已经拥有Node(Entry),因为这需要存储其他一些不可变状态.新节点可以将HEAD作为构造函数参数,创建单向链接列表.
文献中是否存在具有这些特征的队列?
jta*_*orn 13
干得好:
public class FunkyQueue<T> {
private final AtomicReference<Node<T>> _tail = new AtomicReference<Node<T>>();
public void offer(T t) {
while(true) {
Node<T> tail = _tail.get();
Node<T> newTail = new Node<T>(t, tail);
if(_tail.compareAndSet(tail, newTail)) {
break;
}
}
}
public List<T> takeAll() {
Node<T> tail = _tail.getAndSet(null);
LinkedList<T> list = new LinkedList<T>();
while(tail != null) {
list.addFirst(tail.get());
tail = tail.getPrevious();
}
return list;
}
private static final class Node<T>
{
private final T _obj;
private Node<T> _prev;
private Node(T obj, Node<T> prev) {
_obj = obj;
_prev = prev;
}
public T get() {
return _obj;
}
public Node<T> getPrevious() {
return _prev;
}
}
}
Run Code Online (Sandbox Code Playgroud)
鉴于:良好的实现,需要一个CAS offer()
和takeAll()
.
问题:takeAll()
执行时间很长,因为它需要在相反方向上完全遍历单链表.
解决方案:在节点上创建其他跳过级别.对于所提到的数字(N~100K),两个级别就足够了,从而将步数减少takeAll()
到~150.
基于上述实现,Node
类:
public static final class Node<T> {
private final T value;
private Node<T> prev, prevL1, prevL2;
private Node<T> next, nextL1, nextL2;
private Node(T obj, Node<T> prev, long c) {
value = obj;
this.prev = prev;
// level 1 to skip 64 nodes, level 2 to skip 64^2 nodes
// c is a value from some global addition counter, that
// is not required to be atomic with `offer()`
prevL1 = (c & (64 - 1) == 0) ? prev : prev.prevL1;
prevL2 = (c & (64 * 64 - 1) == 0) ? prev : prev.prevL2;
}
public T get() {
return value;
}
public Node<T> findHead() {
// see below
}
public Node<T> next() {
// see below
}
}
Run Code Online (Sandbox Code Playgroud)
FunkyQueue#offer()
方法:
public void offer(T t) {
long c = counter.incrementAndGet();
for(;;) {
Node<T> oldTail = tail.get();
Node<T> newTail = new Node<T>(t, oldTail, c);
if (tail.compareAndSet(oldTail, newTail))
break;
}
}
Run Code Online (Sandbox Code Playgroud)
FunkyQueue#takeAll()
现在将返回列表的头部:
public Node<T> takeAll() {
return tail.getAndSet(null).findHead();
}
Run Code Online (Sandbox Code Playgroud)
它调用Node#findHead()
,现在可以使用跳过级别来加速后向遍历:
private Node<T> findHead() {
Node<T> n = this;
while (n.prevL2 != null) { // <- traverse back on L2, assigning `next` nodes
n.prevL2.nextL2 = n;
n = n.prevL2;
}
while (n.prevL1 != null) { // <- the same for L1
n.prevL1.nextL1 = n;
n = n.prev1;
}
while (n.prev != null) { // <- the same for L0
n.prev.next = n;
n = n.prev;
}
return n;
}
Run Code Online (Sandbox Code Playgroud)
最后,Node#next()
:
public Node<T> next() {
if (this.next == null && this.nextL1 == null && this.nextL2 == null)
throw new IllegalStateException("No such element");
Node<T> n;
if (this.next == null) { // L0 is not traversed yet
if (this.nextL1 == null) { // the same for L1
n = this.nextL2; // step forward on L2
while (n != this) { // traverse on L1
n.prevL1.nextL1 = n;
n = n.prevL1;
}
}
n = this.nextL1; // step forward on L1
while (n != this) { // traverse on L0
n.prev.next = n;
n = n.prev;
}
}
return this.next;
}
Run Code Online (Sandbox Code Playgroud)
我想主要想法很明确.应用一些重构,可以制作Node#findHead()
并因此FunkyQueue#takeAll()
在O(log N)和Node#next()
O(1)中平均运行.
PS如果注意到一些错误或错误的语法,请编辑.