Bre*_*yan 9 java multithreading java-stream
parallel()
然而,今天我使用的是在地图之后执行操作的流;底层源是一个非线程安全的迭代器,类似于BufferedReader.lines实现。
我原本以为 trySplit 会在创建的线程上调用,但是;我观察到对迭代器的访问来自多个线程。
例如,以下愚蠢的迭代器实现只是设置了足够的元素来导致分裂,并且还跟踪访问该hasNext
方法的唯一线程。
class SillyIterator implements Iterator<String> {
private final ArrayDeque<String> src =
IntStream.range(1, 10000)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayDeque::new));
private Map<String, String> ts = new ConcurrentHashMap<>();
public Set<String> threads() { return ts.keySet(); }
private String nextRecord = null;
@Override
public boolean hasNext() {
var n = Thread.currentThread().getName();
ts.put(n, n);
if (nextRecord != null) {
return true;
} else {
nextRecord = src.poll();
return nextRecord != null;
}
}
@Override
public String next() {
if (nextRecord != null || hasNext()) {
var rec = nextRecord;
nextRecord = null;
return rec;
}
throw new NoSuchElementException();
}
}
Run Code Online (Sandbox Code Playgroud)
使用它来创建流,如下所示:
var iter = new SillyIterator();
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL
), false)
.map(n -> "value = " + n)
.parallel()
.collect(toList());
System.out.println(iter.threads());
Run Code Online (Sandbox Code Playgroud)
在我的系统上,这输出了两个 fork join 线程以及主线程,这有点吓到我了。
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
Run Code Online (Sandbox Code Playgroud)
线程安全并不一定意味着只能由一个线程访问。重要的一点是不存在并发访问,即不能同时由多个线程进行访问。如果不同线程的访问是按时间顺序排列的,并且这种顺序也确保了必要的内存可见性(这是调用者的责任),那么它仍然是线程安全的使用。
\n\n\n\n尽管它们在并行算法中具有明显的实用性,但分割器预计不会是线程安全的;相反,使用 spliterator 的并行算法的实现应确保 spliterator 一次仅由一个线程使用。这通常很容易通过串行线程限制来实现,这通常是通过递归分解工作的典型并行算法的自然结果。
\n
分裂器在其整个生命周期中不需要被限制在同一个线程中,但是在调用者端应该有一个明确的切换,确保旧线程在新线程之前停止使用它开始使用它。
\n但重要的是,分裂器不需要是线程安全的,因此,分裂器包装的迭代器也不需要是线程安全的。
\n请注意,典型的行为是在开始遍历之前进行拆分和移交,但由于普通的Iterator
\xe2\x80\x99 不支持拆分,因此包装拆分器必须迭代并缓冲元素来实现拆分。因此,Iterator
从实现\xe2\x80\x99s的角度来看,在尚未开始遍历时,会经历不同线程(但一次一个)的遍历Stream
。
也就是说,lines()
它的实现BufferedReader
是一个不好的例子,你不应该遵循。由于 it\xe2\x80\x99s 以单个readLine()
调用为中心,因此很自然地直接实现Spliterator
而不是实现更复杂的Iterator
并通过spliteratorUnknownSize(\xe2\x80\xa6)
.
由于您的示例同样以单个poll()
调用为中心,因此它\xe2\x80\x99s也可以直接直接实现Spliterator
:
class SillySpliterator extends Spliterators.AbstractSpliterator<String> {\n private final ArrayDeque<String> src = IntStream.range(1, 10000)\n .mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));\n\n SillySpliterator() {\n super(Long.MAX_VALUE, ORDERED | NONNULL);\n }\n\n @Override\n public boolean tryAdvance(Consumer<? super String> action) {\n String nextRecord = src.poll();\n if(nextRecord == null) return false;\n action.accept(nextRecord);\n return true;\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n根据您的实际情况,您还可以将实际双端队列大小传递给构造函数并提供特征SIZED
。
然后,你可以像这样使用它
\nvar result = StreamSupport.stream(new SillySpliterator(), true)\n .map(n -> "value = " + n)\n .collect(toList());\n
Run Code Online (Sandbox Code Playgroud)\n
归档时间: |
|
查看次数: |
370 次 |
最近记录: |