Java 8集合提供了将集合作为流获取的功能.但是,一旦我们调用stream()方法,我们就会以集合的形式获取集合的当前内容.如果我的收藏在流处理期间增长怎么办?流上的操作可能会使用更多数据更新集合.有没有一种简单有效的方法来处理这种情况?
(我在流处理操作中尝试了Stream.concat()但是我得到异常:线程"main"中的异常java.lang.IllegalStateException:stream已经被操作或关闭了)
举一个具体的例子,假设我有一个并发的url队列.
Queue<Url> concurrentUrlQue= initUrlQueue();
Run Code Online (Sandbox Code Playgroud)
现在我想获取此url队列的流,并逐个处理URL.该过程涉及从队列中删除URL,读取URL指向的网页,从页面中提取URL以及将这些URL添加到并发队列中.
concurrentUrlQue.stream().forEach((url)->readAndExtractUrls(url, concurrentUrlQue));
Run Code Online (Sandbox Code Playgroud)
我希望能够将上述动态增长的队列作为流处理.(此外,我希望能够使用并行流处理此动态队列)
有没有一种使用java流实现这一目标的简单方法?
小智 5
你需要编写spliterator来阻止等待新元素.
class QueueSpliterator<T> extends Spliterators.AbstractSpliterator<T> {
private final BlockingQueue<T> queue;
public QueueSpliterator(BlockingQueue<T> queue) {
super(Long.MAX_VALUE, 0);
this.queue = queue;
}
public boolean tryAdvance(Consumer<? super T> action) {
try {
T element = queue.take();
action.accept(element);
return true;
} catch (InterruptedException e) {
return false;
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后使用该分割器创建一个流,并像普通的无限流一样处理它.
public class Main {
public static void main(String... args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);
new Thread(() -> {
for (int i = 0; i < 1000; ++i) {
try {
queue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
Spliterator<Integer> queueSpliterator = new QueueSpliterator<>(queue);
Stream<Integer> stream = StreamSupport.stream(queueSpliterator, false);
stream.forEach(System.out::println);
}
}
Run Code Online (Sandbox Code Playgroud)