在Java8 parallelStream()中使用I/O + ManagedBlocker有什么问题吗?

yan*_*kee 18 java java-stream

Java 8中的默认"paralellStream()"使用公共ForkJoinPool,如果在提交任务时公共池线程用尽,则可能是延迟问题.但是,在许多情况下,有足够的CPU功率可用且任务足够短,因此这不是问题.如果我们确实有一些长期运行的任务,这当然需要仔细考虑,但对于这个问题,让我们假设这不是问题.

但是ForkJoinPool,即使有足够的CPU功率,填充实际上不执行任何CPU绑定工作的I/O任务也是一种引入瓶颈的方法.我明白了.然而,这就是我们所拥有的ManagedBlocker.因此,如果我们有一个I/O任务,我们应该只允许ForkJoinPool在一个内部管理它ManagedBlocker.听起来非常简单.但令我惊讶的ManagedBlocker是,使用一个相当复杂的API来实现简单的事情.毕竟我认为这是一个常见的问题.所以我只是构建了一个简单的实用方法,使其ManagedBlocker易于用于常见情况:

public class BlockingTasks {

    public static<T> T callInManagedBlock(final Supplier<T> supplier) {
        final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
        try {
            ForkJoinPool.managedBlock(managedBlock);
        } catch (InterruptedException e) {
            throw new Error(e);
        }
        return managedBlock.getResult();
    }

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
        private final Supplier<T> supplier;
        private T result;
        private boolean done = false;

        private SupplierManagedBlock(final Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean block() {
            result = supplier.get();
            done = true;
            return true;
        }

        @Override
        public boolean isReleasable() {
            return done;
        }

        public T getResult() {
            return result;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,如果我想在paralell下载几个网站的html代码,我可以像这样没有I/O造成任何麻烦:

public static void main(String[] args) {
    final List<String> pagesHtml = Stream
        .of("https://google.com", "https://stackoverflow.com", "...")
        .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
        .collect(Collectors.toList());
}
Run Code Online (Sandbox Code Playgroud)

我有点惊讶的是,BlockingTasks上面没有像Java 一样附带的类(或者我没有找到它?),但是构建起来并不难.

当我谷歌搜索"java 8并行流"时,我在前四个结果中得到那些声称由于I/O问题Fork/Join糟透了Java的文章:

我在某种程度上改变了我的搜索条件,虽然有很多人在抱怨生活多么可怕,但我发现没有人会像上面那样谈论解决方案.由于我不喜欢Marvin(像行星一样的大脑)和Java 8可用了很长一段时间,我怀疑我在那里建议的东西有些严重错误.

我拼凑了一个小测试:

public static void main(String[] args) {
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
    IntStream.range(0, 10).parallel().forEach((x) -> sleep());
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}

public static void sleep() {
    try {
        System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        throw new Error(e);
    }
}
Run Code Online (Sandbox Code Playgroud)

我跑了,得到了以下结果:

18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End
Run Code Online (Sandbox Code Playgroud)

因此,在我的8 CPU计算机上,ForkJoinPool自然选择8个线程,完成前8个任务,最后完成最后两个任务,这意味着这需要20秒,如果有其他任务排队,则池仍然可能没有使用明显空闲的CPU(除了最后10秒内有6个核心).

然后我用...

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));
Run Code Online (Sandbox Code Playgroud)

...代替...

IntStream.range(0, 10).parallel().forEach((x) -> sleep());
Run Code Online (Sandbox Code Playgroud)

......并得到以下结果:

18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End
Run Code Online (Sandbox Code Playgroud)

它看起来像这样工作,额外的线程开始补偿我的模拟"阻止I/O操作"(睡眠).时间减少到10秒,我想如果我排队更多的任务,那些仍然可以使用可用的CPU功率.

如果I/O操作包含在一个ManagedBlock?中,这个解决方案是否有任何问题,或者通常在流中使用I/O ?

Tag*_*eev 12

简而言之,是的,您的解决方案存在一些问题.它肯定改进了在并行流中使用阻塞代码,并且一些第三方库提供了类似的解决方案(例如,参见BlockingjOOλ库中的类).但是,此解决方案不会更改Stream API中使用的内部拆分策略.Stream API创建的子任务数由AbstractTask类中的预定义常量控制:

/**
 * Default target factor of leaf tasks for parallel decomposition.
 * To allow load balancing, we over-partition, currently to approximately
 * four tasks per processor, which enables others to help out
 * if leaf tasks are uneven or some processors are otherwise busy.
 */
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,它比普通池并行性(默认为CPU核心数)大四倍.真正的分裂算法有点棘手,但即使所有这些任务都是阻塞的,你也不可能拥有超过4x-8x的任务.

例如,如果您有8个CPU内核,那么您的Thread.sleep()测试可以很好地工作IntStream.range(0, 32)(如32 = 8*4).但是,对于IntStream.range(0, 64)您将有32个并行任务,每个处理两个输入数字,因此整个处理将花费20秒,而不是10秒.

  • 不要忘记:Stream API使用Fork/Join是一个实现细节.只要Streams不能保证使用该框架,就没有保证使用`ManagedBlocker`会改善并发性...... (3认同)