据我所知,一般来说Java流不会拆分.但是,我们有一个参与且冗长的管道,最后我们有两种不同类型的处理,它们共享管道的第一部分.
由于数据的大小,存储中间流产品不是可行的解决方案.两次都没有运行管道.
基本上,我们正在寻找的是一种解决方案,它是一个流上的操作,产生两个(或更多)流,这些流被延迟填充并且能够并行使用.我的意思是,如果流A被分成流B和C,当流B和C消耗10个元素时,流A消耗并提供这10个元素,但是如果流B然后尝试消耗更多元素,则它将阻塞直到流C也消耗它们.
是否有针对此问题的预制解决方案或我们可以查看的任何库?如果没有,我们将在哪里开始研究是否要自己实施?还是有一个令人信服的理由根本没有实施?
您可以实现自定义Spliterator来实现此类行为。我们会将您的流分成公共“源”和不同的“消费者”。然后,自定义分割器将元素从源转发到每个使用者。为此,我们将使用 a BlockingQueue(请参阅此问题)。
请注意,这里的困难部分不是 spliterator/stream,而是队列周围消费者的同步,正如对您问题的评论已经表明的那样。尽管如此,无论您如何实现同步,Spliterator都有助于使用流。
@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}
private static class ForkingSpliterator<T>
extends AbstractSpliterator<T>
{
private Spliterator<T> sourceSpliterator;
private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private AtomicInteger nextToTake = new AtomicInteger(0);
private AtomicInteger processed = new AtomicInteger(0);
private boolean sourceDone;
private int consumerCount;
@SafeVarargs
private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
{
super(Long.MAX_VALUE, 0);
sourceSpliterator = source.spliterator();
consumerCount = consumers.length;
for (int i = 0; i < consumers.length; i++)
{
int index = i;
Consumer<Stream<T>> consumer = consumers[i];
new Thread(new Runnable()
{
@Override
public void run()
{
consumer.accept(StreamSupport.stream(new ForkedConsumer(index), false));
}
}).start();
}
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
sourceDone = !sourceSpliterator.tryAdvance(queue::offer);
return !sourceDone;
}
private class ForkedConsumer
extends AbstractSpliterator<T>
{
private int index;
private ForkedConsumer(int index)
{
super(Long.MAX_VALUE, 0);
this.index = index;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
// take next element when it's our turn
while (!nextToTake.compareAndSet(index, index + 1))
{
}
T element;
while ((element = queue.peek()) == null)
{
if (sourceDone)
{
// element is null, and there won't be no more, so "terminate" this sub stream
return false;
}
}
// push to consumer pipeline
action.accept(element);
if (consumerCount == processed.incrementAndGet())
{
// start next round
queue.poll();
processed.set(0);
nextToTake.set(0);
}
return true;
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用这种方法,消费者可以并行处理每个元素,但在开始处理下一个元素之前会相互等待。
已知问题
如果其中一个消费者比其他消费者“短”(例如因为它调用limit()),它也会停止其他消费者并使线程挂起。
例子
public static void sleep(long millis)
{
try { Thread.sleep((long) (Math.random() * 30 + millis)); } catch (InterruptedException e) { }
}
streamForked(Stream.of("1", "2", "3", "4", "5"),
source -> source.map(word -> { sleep(50); return "fast " + word; }).forEach(System.out::println),
source -> source.map(word -> { sleep(300); return "slow " + word; }).forEach(System.out::println),
source -> source.map(word -> { sleep(50); return "2fast " + word; }).forEach(System.out::println));
fast 1
2fast 1
slow 1
fast 2
2fast 2
slow 2
2fast 3
fast 3
slow 3
fast 4
2fast 4
slow 4
2fast 5
fast 5
slow 5
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
209 次 |
| 最近记录: |