相关疑难解决方法(0)

Java 8并行流中的自定义线程池

是否可以为Java 8 并行流指定自定义线程池?我找不到任何地方.

想象一下,我有一个服务器应用程序,我想使用并行流.但是应用程序很大且是多线程的,因此我想将它划分为区分.我不想在另一个模块的应用程序块任务的一个模块中执行缓慢的任务.

如果我不能为不同的模块使用不同的线程池,这意味着我无法在大多数现实情况下安全地使用并行流.

请尝试以下示例.在单独的线程中执行一些CPU密集型任务.这些任务利用并行流.第一个任务被破坏,因此每个步骤需要1秒(通过线程休眠模拟).问题是其他线程卡住并等待损坏的任务完成.这是一个人为的例子,但想象一下servlet应用程序和有人向共享fork连接池提交长时间运行的任务.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing concurrency java-8 java-stream

371
推荐指数
9
解决办法
15万
查看次数

Reader#lines()由于其spliterator中的不可配置的批量大小策略而严重并行化

当流源是a时,我无法实现流处理的良好并行化Reader.在四核CPU上运行下面的代码我首先观察到3个核心,然后突然下降到两个核心,然后是一个核心.整体CPU利用率徘徊在50%左右.

请注意示例的以下特征:

  • 只有6,000行;
  • 每条线需要大约20毫秒来处理;
  • 整个过程大约需要一分钟.

这意味着所有压力都在CPU上,I/O很小.这个例子是一个用于自动并行化的坐鸭.

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

... class imports elided ...    

public class Main
{
  static final AtomicLong totalTime = new AtomicLong();

  public static void main(String[] args) throws IOException {
    final long start = System.nanoTime();
    final Path inputPath = createInput();
    System.out.println("Start processing");

    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
      Files.lines(inputPath).parallel().map(Main::processLine)
        .forEach(w::println);
    }

    final double cpuTime = totalTime.get(),
                 realTime = System.nanoTime()-start;
    final int cores = Runtime.getRuntime().availableProcessors();
    System.out.println("          Cores: " + cores);
    System.out.format("       CPU …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing java-8

14
推荐指数
2
解决办法
1183
查看次数