多个线程在单个流中工作

Joh*_*ith 3 java concurrency multithreading java-8 java-stream

我想知道以下是否有效:

class SomeCalc {

    AtomicLong   someResultA;
    AtomicDouble someResultB;

    public SomeCalc(List<Something> someList) {

        final Stream<Something> someStream = someList.stream();

        new Thread(() -> {
            long result = someStream.mapToLong(something -> something.getSomeLong()).sum();
            someResultA.set(result);
        }).start();

        new Thread(() -> {
            double result = someStream.mapToDouble(something -> (double) something.getSomeLong() / something.getAnotherLong()).sum();
            someResultB.set(result);
        }).start();
    }
}
Run Code Online (Sandbox Code Playgroud)

只要结果如本例中的原子那样,这会很好吗?或者在此过程中是否会有任何ConcurrentAccessModification例外情况或其他问题会出错?

Lin*_*ica 7

它不会工作.你会得到一个:

IllegalStateException: Stream has already been operated upon or closed
Run Code Online (Sandbox Code Playgroud)

因为每个流只能遍历一次.这也可以在javadoc中读取java.util.stream.Stream<T>:

流只能 [...] 上运行一次.例如,这排除了"分叉"流,其中相同的源提供两个或更多个管道,或者同一个流的多个遍历.如果流实现检测到正在重用流,则它可能会抛出IllegalStateException.但是,由于某些流操作可能返回其接收器而不是新的流对象,因此可能无法在所有情况下检测重用.

我想你的例子是简化的.所以你正在准备流,然后让2个线程做额外的东西.

如果你还想保留它.您可以创建该流的供应商,然后让线程获得2个不同的实例:

final Supplier<Stream<Something>> provider = () -> someList.stream(); // and maybe more operations

new Thread(() -> {
    long result = provider.get()  // get an instance
       .mapToLong(something -> something.getSomeLong())
       .sum();
    someResultA.set(result);
}).start();

new Thread(() -> {
    double result = provider.get() // get another instance
         .mapToDouble(something -> (double) something.getSomeLong() / something.getAnotherLong())
         .sum();
    someResultB.set(result);
}).start();
Run Code Online (Sandbox Code Playgroud)