是否可以为Java 8 并行流指定自定义线程池?我找不到任何地方.
想象一下,我有一个服务器应用程序,我想使用并行流.但是应用程序很大且是多线程的,因此我想将它划分为区分.我不想在另一个模块的应用程序块任务的一个模块中执行缓慢的任务.
如果我不能为不同的模块使用不同的线程池,这意味着我无法在大多数现实情况下安全地使用并行流.
请尝试以下示例.在单独的线程中执行一些CPU密集型任务.这些任务利用并行流.第一个任务被破坏,因此每个步骤需要1秒(通过线程休眠模拟).问题是其他线程卡住并等待损坏的任务完成.这是一个人为的例子,但想象一下servlet应用程序和有人向共享fork连接池提交长时间运行的任务.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor …Run Code Online (Sandbox Code Playgroud) 我听说Java 8提供了很多关于并发计算的实用程序.因此,我想知道并行化给定for循环的最简单方法是什么?
public static void main(String[] args)
{
Set<Server> servers = getServers();
Map<String, String> serverData = new ConcurrentHashMap<>();
for (Server server : servers)
{
String serverId = server.getIdentifier();
String data = server.fetchData();
serverData.put(serverId, data);
}
}
Run Code Online (Sandbox Code Playgroud) java concurrency for-loop java.util.concurrent concurrent-programming
说我有一个类似的任务:
for(Object object: objects) {
Result result = compute(object);
list.add(result);
}
Run Code Online (Sandbox Code Playgroud)
并行化每个compute()的最简单方法是什么(假设它们已经可并行化)?
我不需要一个严格符合上述代码的答案,只需一般答案.但是如果您需要更多信息:我的任务是IO绑定的,这是针对Spring Web应用程序的,任务将在HTTP请求中执行.
考虑以下简单代码:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
Run Code Online (Sandbox Code Playgroud)
很长一段时间,我认为即使在flatMap. 但是上面的代码打印了所有的“Thread:main”,证明我的想法是错误的。
一种使其并行的简单方法flatMap是收集然后再次流式传输:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.collect(Collectors.toList())
.parallelStream()
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
Run Code Online (Sandbox Code Playgroud)
我想知道是否有更好的方法,以及flatMap仅在调用之前并行化流的设计选择,而不是在调用之后并行化。
========关于问题的更多说明========
从一些答案来看,我的问题似乎没有完全表达出来。正如@Andreas 所说,如果我从 …
我必须遍历一个列表并为每个对象调用一个方法,但要并行执行。在循环之后,还有其他语句,它们必须等待并行方法调用。我怎么能在JAVA中做到这一点?
public void a(List<Object> list) {
for(Object o : list) {
asynchMethod(o); // this n method call must run in the same time
}
// wait for all asynchMethod result
/**
* ...other statements
*/
}
private void asynchMethod(Object o) {
// some code
}
Run Code Online (Sandbox Code Playgroud)