Java Streams:通过现有流放置新元素

Lás*_*zki 2 java stream java-8 java-stream

我有一个用例,我有一个数据源,比方说:每一秒,一个新的字符串来自该数据源.

我想创建一个管道,如果新的字符串到达​​,它将被推送到该管道进行处理.

我猜Java 8引入的Stream API可以做到这一点,因为它有方便的功能来处理任意集合的数据,但是我想跳过我收集数据的部分到一个单独的集合并派遣到达数据直接到我刚刚创建的Stream.

有没有办法做到这一点?

Tom*_*ski 6

为了做你所描述的,你需要某种阻止.我会使用BlockingQueue(任何类型都可以 - 如果你想避免集合,使用SynchronousQueue,根本没有内部状态),并Stream使用它创建一个无限Stream.generate.

例:

class StreamableQueue<T> {

    private BlockingQueue<T> dataSource;

    Stream<T> asStream() {
        return Stream.generate(this::takeFromDataSource);
    }

    private T takeFromDataSource() {
        try {
            return dataSource.take();
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当然,BlockingQueue提供dataSource给这个类需要从不同的线程中提供元素.


编辑:一个小的添加 - 而不是使用try-catch,您可以使用: