是否可以为Java 8 并行流指定自定义线程池?我找不到任何地方.
想象一下,我有一个服务器应用程序,我想使用并行流.但是应用程序很大且是多线程的,因此我想将它划分为区分.我不想在另一个模块的应用程序块任务的一个模块中执行缓慢的任务.
如果我不能为不同的模块使用不同的线程池,这意味着我无法在大多数现实情况下安全地使用并行流.
请尝试以下示例.在单独的线程中执行一些CPU密集型任务.这些任务利用并行流.第一个任务被破坏,因此每个步骤需要1秒(通过线程休眠模拟).问题是其他线程卡住并等待损坏的任务完成.这是一个人为的例子,但想象一下servlet应用程序和有人向共享fork连接池提交长时间运行的任务.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor …
Run Code Online (Sandbox Code Playgroud) 所以我知道,如果你使用parallelStream
没有自定义ForkJoinPool,它将使用默认的ForkJoinPool,默认情况下,只有一个线程,因为你有处理器.
因此,如此处所述(以及该问题的另一个答案)为了获得更多的并行性,您必须:
将并行流执行提交给您自己的ForkJoinPool:yourFJP.submit(() - > stream.parallel().forEach(doSomething));
所以,我这样做了:
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool(1000);
IntStream stream = IntStream.range(0, 999999);
final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());
forkJoinPool.submit(() -> {
stream.parallel().forEach(n -> {
System.out.println("Processing n: " + n);
try {
Thread.sleep(500);
thNames.add(Thread.currentThread().getName());
System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
} catch (Exception e) { …
Run Code Online (Sandbox Code Playgroud) 当我运行以下代码时,8 个可用线程中只有 2 个可以运行,任何人都可以解释为什么会出现这种情况吗?我怎样才能改变代码,使其能够利用所有 8 个线程?
Tree.java
:
package il.co.roy;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class Tree<T>
{
private final T data;
private final Set<Tree<T>> subTrees;
public Tree(T data, Set<Tree<T>> subTrees)
{
this.data = data;
this.subTrees = subTrees;
}
public Tree(T data)
{
this(data, new HashSet<>());
}
public Tree()
{
this(null);
}
public T getData()
{
return data;
}
public Set<Tree<T>> getSubTrees()
{
return subTrees;
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true; …
Run Code Online (Sandbox Code Playgroud) 我有一个我想要处理的对象列表,Java8流API看起来是最干净和可读的方式.
但是我需要对这些对象执行的一些操作包括阻塞IO(比如读取数据库) - 所以我想将这些操作提交给有几十个线程的线程池.
起初我想过做一些事情:
myObjectList
.stream()
.filter(wrapPredicate(obj -> threadPoolExecutor.submit(
() -> longQuery(obj) // returns boolean
).get()) // wait for future & unwrap boolean
.map(filtered -> threadPoolExecutor.submit(
() -> anotherQuery(filtered) // returns Optional
))
.map(wrapFunction(Future::get))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
Run Code Online (Sandbox Code Playgroud)
该wrapPredicate
和wrapFunction
只是用于检查的异常重新抛出.
但是,显然,调用Future.get()
将阻塞流的线程,直到查询完成给定对象,并且流将在此之前不会进展.因此,一次只处理一个对象,并且线程池没有意义.
我可以使用并行流,但是我需要希望默认ForkJoinPool
值足够了.或者只是增加"java.util.concurrent.ForkJoinPool.common.parallelism"
,但我不想为了那个流而改变整个应用程序的设置.我可以在自定义中创建流ForkJoinPool
,但我发现它并不能保证并行度.
所以我最终得到了类似的东西,只是为了保证在等待期货完成之前将所有需要的任务提交给threadPool:
myObjectList
.stream()
.map(obj -> Pair.of(obj, threadPoolExecutor.submit(
() -> longQuery(obj) // returns boolean
))
)
.collect(toList()).stream() // terminate stream to actually submit tasks to the …
Run Code Online (Sandbox Code Playgroud)