并行流是否以线程安全的方式处理上游迭代器?

Bre*_*yan 9 java multithreading java-stream

parallel()然而,今天我使用的是在地图之后执行操作的流;底层源是一个非线程安全的迭代器,类似于BufferedReader.lines实现。

我原本以为 trySplit 会在创建的线程上调用,但是;我观察到对迭代器的访问来自多个线程。

例如,以下愚蠢的迭代器实现只是设置了足够的元素来导致分裂,并且还跟踪访问该hasNext方法的唯一线程。

class SillyIterator implements Iterator<String> {

    private final ArrayDeque<String> src =
        IntStream.range(1, 10000)
            .mapToObj(Integer::toString)
            .collect(toCollection(ArrayDeque::new));
    private Map<String, String> ts = new ConcurrentHashMap<>();
    public Set<String> threads() { return ts.keySet(); }
    private String nextRecord = null;

    @Override
    public boolean hasNext() {
        var n = Thread.currentThread().getName();
        ts.put(n, n);
        if (nextRecord != null) {
            return true;
        } else {
            nextRecord = src.poll();
            return nextRecord != null;
        }
    }
    @Override
    public String next() {
        if (nextRecord != null || hasNext()) {
            var rec = nextRecord;
            nextRecord = null;
            return rec;
        }
        throw new NoSuchElementException();
    }

}
Run Code Online (Sandbox Code Playgroud)

使用它来创建流,如下所示:

var iter = new SillyIterator();
StreamSupport
    .stream(Spliterators.spliteratorUnknownSize(
        iter, Spliterator.ORDERED | Spliterator.NONNULL
    ), false)
    .map(n -> "value = " + n)
    .parallel()
    .collect(toList());

System.out.println(iter.threads());
Run Code Online (Sandbox Code Playgroud)

在我的系统上,这输出了两个 fork join 线程以及主线程,这有点吓到我了。

[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
Run Code Online (Sandbox Code Playgroud)

Hol*_*ger 4

线程安全并不一定意味着只能由一个线程访问。重要的一点是不存在并发访问,即不能同时由多个线程进行访问。如果不同线程的访问是按时间顺序排列的,并且这种顺序也确保了必要的内存可见性(这是调用者的责任),那么它仍然是线程安全的使用。

\n

文档Spliterator

\n
\n

尽管它们在并行算法中具有明显的实用性,但分割器预计不会是线程安全的;相反,使用 spliterator 的并行算法的实现应确保 spliterator 一次仅由一个线程使用。这通常很容易通过串行线程限制来实现,这通常是通过递归分解工作的典型并行算法的自然结果。

\n
\n

分裂器在其整个生命周期中不需要被限制在同一个线程中,但是在调用者端应该有一个明确的切换,确保旧线程在新线程之前停止使用它开始使用它。

\n

但重要的是,分裂器不需要是线程安全的,因此,分裂器包装的迭代器也不需要是线程安全的。

\n

请注意,典型的行为是在开始遍历之前进行拆分和移交,但由于普通的Iterator\xe2\x80\x99 不支持拆分,因此包装拆分器必须迭代并缓冲元素来实现拆分。因此,Iterator从实现\xe2\x80\x99s的角度来看,在尚未开始遍历时,会经历不同线程(但一次一个)的遍历Stream

\n
\n

也就是说,lines()它的实现BufferedReader是一个不好的例子,你不应该遵循。由于 it\xe2\x80\x99s 以单个readLine()调用为中心,因此很自然地直接实现Spliterator而不是实现更复杂的Iterator并通过spliteratorUnknownSize(\xe2\x80\xa6).

\n

由于您的示例同样以单个poll()调用为中心,因此它\xe2\x80\x99s也可以直接直接实现Spliterator

\n
class SillySpliterator extends Spliterators.AbstractSpliterator<String> {\n    private final ArrayDeque<String> src = IntStream.range(1, 10000)\n        .mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));\n\n    SillySpliterator() {\n        super(Long.MAX_VALUE, ORDERED | NONNULL);\n    }\n\n    @Override\n    public boolean tryAdvance(Consumer<? super String> action) {\n        String nextRecord = src.poll();\n        if(nextRecord == null) return false;\n        action.accept(nextRecord);\n        return true;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

根据您的实际情况,您还可以将实际双端队列大小传递给构造函数并提供特征SIZED

\n

然后,你可以像这样使用它

\n
var result = StreamSupport.stream(new SillySpliterator(), true)\n    .map(n -> "value = " + n)\n    .collect(toList());\n
Run Code Online (Sandbox Code Playgroud)\n

  • 我以为这只是一个例子。这就是为什么我建议“根据您的现实生活案例”添加大小特征,而不是仅仅将其添加到我的示例中。我还试图强调直接实现 spliterator 背后的一般模式。每当你有某种“*获取下一个元素或告诉我没有更多*”操作时,例如`readLine()`、`poll()`、`fetchNext()`等,`Spliterator`就简单得多来实施。与“Iterator”相比,即使使用“Spliterator”的两种方法 API 也并不困难。这只是熟悉它的问题。 (2认同)