流API和队列:订阅BlockingQueue流式

Mik*_*sky 23 java java-8 java-stream

假设我们有一个队列

BlockingQueue<String> queue= new LinkedBlockingQueue<>();
Run Code Online (Sandbox Code Playgroud)

而其他一些线程将值放入其中,然后我们就像读取它一样

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}
Run Code Online (Sandbox Code Playgroud)

如何以流样式迭代此队列,同时保持与上述代码类似的语义.

此代码仅遍历当前队列状态:

queue.stream().forEach(e -> System.out.println(e));
Run Code Online (Sandbox Code Playgroud)

Stu*_*rks 32

我猜你有点期待,但我觉得我有一个很好的预感.

队列流(如遍历队列)表示队列的当前内容.当迭代器或流到达队列的尾部时,它不会阻止等待添加的其他元素.迭代器或流在该点耗尽,计算终止.

如果您想要一个包含队列中所有当前和未来元素的流,您可以执行以下操作:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   
Run Code Online (Sandbox Code Playgroud)

(不幸的是需要处理InterruptedException这使得它非常混乱.)

请注意,无法关闭队列,也无法Stream.generate停止生成元素,因此这实际上是一个无限流.终止它的唯一方法是使用短路流操作,例如findFirst.


Joh*_*ean 14

您可以查看异步队列实现.如果你有Java 8,那么独眼巨人反应,我是这个项目的开发人员,提供了一个async.Queue,它允许你同步地(并且干净地)填充和使用队列.

例如

Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
Run Code Online (Sandbox Code Playgroud)

或者简单地说(只要这是一个com.aol.simple.react.async.Queue)

Queue<String> queue = new Queue<>();
Run Code Online (Sandbox Code Playgroud)

然后在一个单独的线程中:

new Thread(() -> {
        while (true) {
            queue.add("New message " + System.currentTimeMillis());
        }
    }).start();
Run Code Online (Sandbox Code Playgroud)

回到主线程,您的原始代码现在应该按预期工作(不经意地迭代添加到队列中的消息并将其打印出来)

queue.stream().forEach(e -> System.out.println(e));
Run Code Online (Sandbox Code Playgroud)

Queue和Stream可以在任何阶段关闭 -

queue.close();
Run Code Online (Sandbox Code Playgroud)


小智 11

另一种方法是构建自定义Spliterator.在我的情况下,我有一个阻塞队列,我想构建一个继续提取元素的流,直到块超时.分裂器是这样的:

public class QueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> queue;
    private final long timeoutMs;

    public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
        this.queue = queue;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public int characteristics() {
        return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        try {
            final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
            if (next == null) {
                return false;
            }
            action.accept(next);
            return true;
        } catch (final InterruptedException e) {
            throw new SupplierErrorException("interrupted", e);
        }
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

}
Run Code Online (Sandbox Code Playgroud)

处理InterruptedException抛出的异常是RuntimeException的扩展.使用此类,可以通过以下方法构建流:StreamSupport.stream(new QueueSpliterator(...))并添加常用的流操作.