目标是在Java 8流的帮助下处理连续的元素流.因此,在处理该流时,将元素添加到并行流的数据源中.
Streams的Javadoc在"无干扰"部分中描述了以下属性:
对于大多数数据源,防止干扰意味着确保在流管道的执行期间根本不修改数据源.值得注意的例外是其源是并发集合的流,这些集合专门用于处理并发修改.并发流源是Spliterator报告CONCURRENT特性的源.
这就是在我们的尝试中使用ConcurrentLinkedQueue的原因,它返回true
new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)
Run Code Online (Sandbox Code Playgroud)
没有明确说明,在并行流中使用时不得修改数据源.
在我们的示例中,对于流中的每个元素,递增的计数器值被添加到队列中,该队列是流的数据源,直到计数器大于N.通过调用queue.stream(),一切正常,顺序执行:
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testSequential(N));
}
public static int testSequential(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> …Run Code Online (Sandbox Code Playgroud) java parallel-processing concurrency multithreading java-stream