实现未知大小的非并行 Spliterator?

tmn*_*tmn 3 java parallel-processing lambda java-8 spliterator

我对我所有的研究感到有点困惑。我有一个名为 TabularResultSet 的自定义接口(为了示例,我已经淡化了它),它遍历本质上为表格的任何数据集。它有一个类似于迭代器的 next() 方法,它可以循环遍历 QueryResultSet、剪贴板中的选项卡式表格、CSV 等...

但是,我正在尝试创建一个 Spliterator 来包装我的 TabularResultSet 并轻松地将其转换为流。我无法想象一种安全的并行化方法,因为 TabularResultSet 可能会遍历 QueryResultSet,并且同时调用 next() 可能会造成严重破坏。我认为可以安全地完成并行化的唯一方法是让单个工作线程调用 next() ,并将数据传递给并行线程来处理它。

所以我认为并行化不是一个简单的选择。我如何在不并行化的情况下让这个东西流式传输?这是我到目前为止的工作...

public final class SpliteratorTest {

    public static void main(String[] args) {
       TabularResultSet rs = null; /* instantiate an implementation; */

       Stream<TabularResultSet> rsStream = StreamSupport.stream(new TabularSpliterator(rs), false);
    }

    public static interface TabularResultSet {
        public boolean next();

        public List<Object> getData();
    }

    private static final class TabularSpliterator implements Spliterator<TabularResultSet> {

        private final TabularResultSet rs;

        public TabularSpliterator(TabularResultSet rs) {
            this.rs = rs;
        }
        @Override
        public boolean tryAdvance(Consumer<? super TabularResultSet> action) {
            action.accept(rs);
            return rs.next();
        }

        @Override
        public Spliterator<TabularResultSet> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Stu*_*rks 5

这可能是最容易扩展的Spliterators.AbstractSpliterator。如果您这样做,您只需实施tryAdvance. 这可以变成并行流;并行性来自于流实现tryAdvance多次调用、批量接收数据并在不同线程中处理它。

如果TabularResultSet是类似 JDBC 的东西ResultSet,我认为您不需要 aSpliterator<TabularResultSet>或 a Stream<TabularResultSet>。相反,它看起来像 aTabularResultSet代表整个表格数据集,因此您可能希望每个 spliterator 或流元素代表该表中的一行 -由?List<Object>返回的行。getData()如果是这样,你会想要类似下面的东西。

class TabularSpliterator extends Spliterators.AbstractSpliterator<List<Object>> {
    private final TabularResultSet rs;

    public TabularSpliterator(TabularResultSet rs) {
        super(...);
        this.rs = rs;
    }

    @Override public boolean tryAdvance(Consumer<? super List<Object>> action) {
        if (rs.next()) {
            action.accept(rs.getData());
            return true;
        } else {
            return false;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以通过调用将此 spliterator 的实例转换为流StreamSupport.stream()

注意:一般来说,Spliterator 实例不是从多个线程调用的,甚至不需要是线程安全的。有关详细信息,请参阅以“尽管...”开头的段落中的Spliterator 类文档。

  • @托马斯N。是的,Spliterator 本身不需要关心被并发调用。(我在一些文档链接中进行了编辑。) tryAdvance 方法可能会被多次调用,并将结果传递给不同的线程,因此传递给 `action.accept()` 的结果需要彼此独立。如果 SingletonTabularSet 这样做,你应该没问题。 (2认同)