将Java流拆分为两个惰性流,无需终端操作

Tor*_*que 5 java java-stream

据我所知,一般来说Java流不会拆分.但是,我们有一个参与且冗长的管道,最后我们有两种不同类型的处理,它们共享管道的第一部分.

由于数据的大小,存储中间流产品不是可行的解决方案.两次都没有运行管道.

基本上,我们正在寻找的是一种解决方案,它是一个流上的操作,产生两个(或更多)流,这些流被延迟填充并且能够并行使用.我的意思是,如果流A被分成流B和C,当流B和C消耗10个元素时,流A消耗并提供这10个元素,但是如果流B然后尝试消耗更多元素,则它将阻塞直到流C也消耗它们.

是否有针对此问题的预制解决方案或我们可以查看的任何库?如果没有,我们将在哪里开始研究是否要自己实施?还是有一个令人信服的理由根本没有实施?

Mal*_*wig 3

您可以实现自定义Spliterator来实现此类行为。我们会将您的流分成公共“源”和不同的“消费者”。然后,自定义分割器将元素从源转发到每个使用者。为此,我们将使用 a BlockingQueue(请参阅此问题)。

请注意,这里的困难部分不是 spliterator/stream,而是队列周围消费者的同步,正如对您问题的评论已经表明的那样。尽管如此,无论您如何实现同步,Spliterator都有助于使用流。

@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
    return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}

private static class ForkingSpliterator<T>
    extends AbstractSpliterator<T>
{
    private Spliterator<T>   sourceSpliterator;

    private BlockingQueue<T> queue      = new LinkedBlockingQueue<>();

    private AtomicInteger    nextToTake = new AtomicInteger(0);
    private AtomicInteger    processed  = new AtomicInteger(0);

    private boolean          sourceDone;
    private int              consumerCount;

    @SafeVarargs
    private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
    {
        super(Long.MAX_VALUE, 0);

        sourceSpliterator = source.spliterator();
        consumerCount = consumers.length;

        for (int i = 0; i < consumers.length; i++)
        {
            int index = i;
            Consumer<Stream<T>> consumer = consumers[i];
            new Thread(new Runnable()
            {
                @Override
                public void run()
                {
                    consumer.accept(StreamSupport.stream(new ForkedConsumer(index), false));
                }
            }).start();
        }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        sourceDone = !sourceSpliterator.tryAdvance(queue::offer);
        return !sourceDone;
    }

    private class ForkedConsumer
        extends AbstractSpliterator<T>
    {
        private int index;

        private ForkedConsumer(int index)
        {
            super(Long.MAX_VALUE, 0);

            this.index = index;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            // take next element when it's our turn
            while (!nextToTake.compareAndSet(index, index + 1))
            {
            }
            T element;
            while ((element = queue.peek()) == null)
            {
                if (sourceDone)
                {
                    // element is null, and there won't be no more, so "terminate" this sub stream
                    return false;
                }
            }

            // push to consumer pipeline
            action.accept(element);

            if (consumerCount == processed.incrementAndGet())
            {
                // start next round
                queue.poll();
                processed.set(0);
                nextToTake.set(0);
            }

            return true;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

使用这种方法,消费者可以并行处理每个元素,但在开始处理下一个元素之前会相互等待。

已知问题 如果其中一个消费者比其他消费者“短”(例如因为它调用limit()),它也会停止其他消费者并使线程挂起。


例子

public static void sleep(long millis)
{
    try { Thread.sleep((long) (Math.random() * 30 + millis)); } catch (InterruptedException e) { }
}

streamForked(Stream.of("1", "2", "3", "4", "5"),
             source -> source.map(word -> { sleep(50); return "fast   " + word; }).forEach(System.out::println),
             source -> source.map(word -> { sleep(300); return "slow      " + word; }).forEach(System.out::println),
             source -> source.map(word -> { sleep(50); return "2fast        " + word; }).forEach(System.out::println));

fast   1
2fast        1
slow      1
fast   2
2fast        2
slow      2
2fast        3
fast   3
slow      3
fast   4
2fast        4
slow      4
2fast        5
fast   5
slow      5
Run Code Online (Sandbox Code Playgroud)