即时向Java 8并行Streams添加元素

L.M*_*ckl 9 java parallel-processing concurrency multithreading java-stream

目标是在Java 8流的帮助下处理连续的元素流.因此,在处理该流时,将元素添加到并行流的数据源中.

StreamsJavadoc在"无干扰"部分中描述了以下属性:

对于大多数数据源,防止干扰意味着确保在流管道的执行期间根本不修改数据源.值得注意的例外是其源是并发集合的流,这些集合专门用于处理并发修改.并发流源是Spliterator报告CONCURRENT特性的源.

这就是在我们的尝试中使用ConcurrentLinkedQueue的原因,它返回true

new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)
Run Code Online (Sandbox Code Playgroud)

没有明确说明,在并行流中使用时不得修改数据源.

在我们的示例中,对于流中的每个元素,递增的计数器值被添加到队列中,该队列是流的数据源,直到计数器大于N.通过调用queue.stream(),一切正常,顺序执行:

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class StreamTest {
    public static void main(String[] args) {
        final int N = 10000;
        assertEquals(N, testSequential(N));
    }

    public static int testSequential(int N) {
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.stream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            }
        });
        stream.close();
        return check.get();
    }
}
Run Code Online (Sandbox Code Playgroud)

作为第二次尝试,流是并行的并抛出java.lang.AssertionError,因为check小于N并且并非处理队列中的每个元素.流可能已提前完成执行,因为队列可能在某个时间点变空.

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class StreamTest {
    public static void main(String[] args) {
        final int N = 10000;
        assertEquals(N, testParallel1(N));
    }

    public static int testParallel1(int N) {
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.parallelStream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            }
        });
        stream.close();
        return check.get();
    }
}
Run Code Online (Sandbox Code Playgroud)

下一次尝试是在连续流"真正"结束(队列为空)时发出主线程信号,然后关闭流对象.这里的问题是流对象似乎只从队列中读取一次或至少不连续地读取元素,并且永远不会到达流的"真实"端.

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

public class StreamTest {

    public static void main(String[] args) {
        final int N = 10000;
        try {
            assertEquals(N, testParallel2(N));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static int testParallel2(int N) throws InterruptedException {
        final Lock lock = new ReentrantLock();
        final Condition cond = lock.newCondition();

        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.parallelStream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            lock.lock();
            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            } else {
                cond.signal();
            }
            lock.unlock();
        });

        lock.lock();
        while (check.get() < N) {
            cond.await();
        }
        lock.unlock();
        stream.close();
        return check.get();
    }
}
Run Code Online (Sandbox Code Playgroud)

由此产生的问题是:

  • 我们做错了吗?
  • 它是Stream API的未指定甚至错误用法吗?
  • 我们怎样才能达到理想的行为呢?

Pet*_*rey 0

流可以连续生成或从修改的集合中生成,也不是设计为连续运行的。它旨在处理流启动时可用的元素,并在处理完后返回。一旦到达终点,它就会停止。

否则我们如何才能实现所需的行为呢?

您需要使用不同的方法。我会使用ExecutorService您要执行的提交任务。

另一种方法是使用连续流,当没有可用结果时该流会阻塞。注意:这将锁定ForkJoinPool并行流使用的 Common,并且其他代码无法使用它。