mab*_*abi 5 java producer-consumer java-8 completable-future
我正在玩弄 Java8 的流和CompletableFutures。我预先存在的代码有一个类,它接受一个 URL 并下载它:
public class FileDownloader implements Runnable {
private URL target;
public FileDownloader(String target) {
this.target = new URL(target);
}
public void run() { /* do it */ }
}
Run Code Online (Sandbox Code Playgroud)
现在,这个类从另一个发出的部分List<String>(单个主机上的多个目标)获取它的信息。
我已将周围的代码切换为CompletableFuture:
public class Downloader {
public static void main(String[] args) {
List<String> hosts = fetchTargetHosts();
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync((files) -> {
for (String target : files) {
new FileDownloader(target).run();
}
});
}
}
public static class HostDownloader implements Supplier<List<String>> {
/* not shown */
}
/* My implementation should either be Runnable or Consumer.
Please suggest based on a idiomatic approach to the main loop.
*/
public static class FileDownloader implements Runnable, Consumer<String> {
private String target;
public FileDownloader(String target) {
this.target = target;
}
@Override
public void run() { accept(this.target); }
@Override
public void accept(String target) {
try (Writer output = new FileWriter("/tmp/blubb")) {
output.write(new URL(target).getContent().toString());
} catch (IOException e) { /* just for demo */ }
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在,这感觉不自然。我正在生成一个Strings流,我一次FileDownloader消耗其中一个。是否有现成的东西可以使我的单个值Consumer与Lists 一起使用,还是我在for这里陷入循环?
我知道将循环移动到 中accept并制作一个是微不足道的Consumer<List<String>>,这不是重点。
将两个直接相关的步骤分解为两个异步步骤是没有意义的。他们仍然是依赖的,如果分离有任何影响,那也不会是积极的。
\n\n你可以简单地使用
\n\nList<String> hosts = fetchTargetHosts();\nFileDownloader fileDownloader = new FileDownloader();\nfor(String host: hosts)\n CompletableFuture.runAsync(()->\n new HostDownloader(host).get().forEach(fileDownloader));\nRun Code Online (Sandbox Code Playgroud)\n\n或者,假设FileDownloader没有关于下载的可变状态:
for(String host: hosts)\n CompletableFuture.runAsync(()->\n new HostDownloader(host).get().parallelStream().forEach(fileDownloader));\nRun Code Online (Sandbox Code Playgroud)\n\nsupplyAsync这仍然具有与使用plus的原始方法相同的并发级别thenAcceptAsync,只是因为这两个依赖步骤无论如何都可以同时运行,因此简单的解决方案是将这两个步骤放入一个将异步执行的简洁操作中。
不过,此时\xe2\x80\x99s值得注意的是,CompletableFuture不建议整个使用此操作。正如\xe2\x80\x99s 文档所述:
\n\n\n\n
\n- 所有没有显式 Executor 参数的异步方法都使用
\nForkJoinPool.commonPool()
公共池的问题在于,它预先配置的并发级别取决于CPU核心的数量,并且如果线程在I/O操作期间被阻塞,则不会调整并发级别。换句话说,它不适合I/O操作。
\n\n与 不同的是Stream,允许您为异步CompletableFuture操作指定 an ,因此您可以配置自己的适合 I/O 操作,另一方面,当您无论如何处理 an 时,根本不需要,至少不需要对于这样一个简单的任务:ExecutorExecutorExecutorCompletableFuture
List<String> hosts = fetchTargetHosts();\n\nint concurrentHosts = 10;\nint concurrentConnections = 100;\nExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);\nExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);\n\nFileDownloader fileDownloader = new FileDownloader();\nfor(String host: hosts) hostEs.execute(()-> {\n for(String target: new HostDownloader(host).get())\n connEs.execute(()->fileDownloader.accept(target));\n});\nRun Code Online (Sandbox Code Playgroud)\n\n在这个地方,您可以考虑将 的代码内联FileDownloader.accept到 lambda 表达式中,或者将其恢复为 a Runnable,以便您可以将内部循环\xe2\x80\x99s 语句更改为connEs.execute(new FileDownloader(target))。