小编L.M*_*ckl的帖子

即时向Java 8并行Streams添加元素

目标是在Java 8流的帮助下处理连续的元素流.因此,在处理该流时,将元素添加到并行流的数据源中.

StreamsJavadoc在"无干扰"部分中描述了以下属性:

对于大多数数据源,防止干扰意味着确保在流管道的执行期间根本不修改数据源.值得注意的例外是其源是并发集合的流,这些集合专门用于处理并发修改.并发流源是Spliterator报告CONCURRENT特性的源.

这就是在我们的尝试中使用ConcurrentLinkedQueue的原因,它返回true

new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)
Run Code Online (Sandbox Code Playgroud)

没有明确说明,在并行流中使用时不得修改数据源.

在我们的示例中,对于流中的每个元素,递增的计数器值被添加到队列中,该队列是流的数据源,直到计数器大于N.通过调用queue.stream(),一切正常,顺序执行:

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class StreamTest {
    public static void main(String[] args) {
        final int N = 10000;
        assertEquals(N, testSequential(N));
    }

    public static int testSequential(int N) {
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing concurrency multithreading java-stream

9
推荐指数
1
解决办法
1053
查看次数