强制Java流尽早执行管道的一部分,以将阻塞任务提交给线程池

sil*_*eth 5 java java-stream

我有一个我想要处理的对象列表,Java8流API看起来是最干净和可读的方式.

但是我需要对这些对象执行的一些操作包括阻塞IO(比如读取数据库) - 所以我想将这些操作提交给有几十个线程的线程池.

起初我想过做一些事情:

myObjectList
    .stream()
    .filter(wrapPredicate(obj -> threadPoolExecutor.submit(
            () -> longQuery(obj)          // returns boolean
    ).get())                              // wait for future & unwrap boolean
    .map(filtered -> threadPoolExecutor.submit(
            () -> anotherQuery(filtered)  // returns Optional
    ))
    .map(wrapFunction(Future::get))
    .filter(Optional::isPresent)
    .map(Optional::get)
    .collect(toList());
Run Code Online (Sandbox Code Playgroud)

wrapPredicatewrapFunction只是用于检查的异常重新抛出.

但是,显然,调用Future.get()将阻塞流的线程,直到查询完成给定对象,并且流将在此之前不会进展.因此,一次只处理一个对象,并且线程池没有意义.

我可以使用并行流,但是我需要希望默认ForkJoinPool值足够了.或者只是增加"java.util.concurrent.ForkJoinPool.common.parallelism",但我不想为了那个流而改变整个应用程序的设置.我可以在自定义中创建流ForkJoinPool,但我发现它并不能保证并行度.

所以我最终得到了类似的东西,只是为了保证在等待期货完成之前将所有需要的任务提交给threadPool:

myObjectList
    .stream()
    .map(obj -> Pair.of(obj, threadPoolExecutor.submit(
                    () -> longQuery(obj)             // returns boolean
        ))
    )
    .collect(toList()).stream()                      // terminate stream to actually submit tasks to the pool
    .filter(wrapPredicate(p -> p.getRight().get()))  // wait & unwrap future after all tasks are submitted
    .map(Pair::getLeft)
    .map(filtered -> threadPoolExecutor.submit(
            () -> anotherQuery(filtered)             // returns Optional
    ))
    .collect(toList()).stream()                      // terminate stream to actually submit tasks to the pool
    .map(wrapFunction(Future::get))                  // wait & unwrap futures after all submitted
    .filter(Optional::isPresent)
    .map(Optional::get)
    .collect(toList());
Run Code Online (Sandbox Code Playgroud)

有没有明显更好的方法来实现这一目标?

一种更优雅的方式告诉流"直到现在为流中的每个对象执行流水线步骤",然后继续处理除了.collect(toList()).stream()和更好的方法来过滤效果,而Future不是将其打包到Apache Commons中Pair以便Pair::getRight稍后过滤?或者对问题采取完全不同的方法?