Java 8流是否类似于RxJava observables?
Java 8流定义:
新
java.util.stream包中的类提供Stream API以支持对元素流的功能样式操作.
在JDK8中,当我使用parallelStream时会产生多少个线程?例如,在代码中:
list.parallelStream().forEach(/** Do Something */);
Run Code Online (Sandbox Code Playgroud)
如果此列表包含100000个项目,将生成多少个线程?
另外,每个线程都可以获得相同数量的项目,还是随机分配?
我正在阅读有关Java 和的区别的问题,这已经有几年了。令我惊讶的是,只有一个问题提到了使用的任何缺点; 也就是说,如果您使用大量CPU,则速度会降低。Arrays.sortArrays.parallelSortparallelSort
假设您不在某种专门的单线程环境中,应该总是选择一个parallelSort吗?有没有理由不这样做?请注意,上述问题的答案之一是,如果少于4096个元素,则无论如何都会parallelSort调用sort。
以下是我对Java 8 的Stream框架的理解:
虽然有人已经找到了一种方法来使用自定义线程池和Stream框架的并行执行,但我不能在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容.(Collection#parallelStream(),StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源).
我只能搜索搜索结果的花絮是:
Lambda的状态:Libraries Edition("引擎盖下的并行")
Vaguely提到Stream框架和Fork/Join机制.
Fork/Join机器旨在实现此过程的自动化.
JEP 107:集合的批量数据操作
几乎直接表明Collection接口的默认方法#parallelStream()使用Fork/Join实现自身.但仍然没有关于公共池.
并行实现基于Java 7中引入的java.util.concurrency Fork/Join实现.
类数组(Javadoc)
直接表示使用公共池的多次.
ForkJoin公共池用于执行任何并行任务.
所以我的问题是:
在哪里说ForkJoinPool#commonPool()用于对从Java 8 API获得的流进行并行操作?
考虑以下情况:我们使用Java 8并行流来执行并行forEach循环,例如,
IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})
Run Code Online (Sandbox Code Playgroud)
并行线程的数量由系统属性"java.util.concurrent.ForkJoinPool.common.parallelism"控制,通常等于处理器的数量.
现在假设我们想限制特定工作的并行执行次数 - 例如因为该部分是内存密集型而内存约束意味着并行执行的限制.
限制并行执行的一种明显而优雅的方法是使用信号量(这里建议),例如,下面的代码片段将并行执行的数量限制为5:
final Semaphore concurrentExecutions = new Semaphore(5);
IntStream.range(0,20).parallel().forEach(i -> {
concurrentExecutions.acquireUninterruptibly();
try {
/* WORK DONE HERE */
}
finally {
concurrentExecutions.release();
}
});
Run Code Online (Sandbox Code Playgroud)
这很好用!
但是:在worker(at /* WORK DONE HERE */)中使用任何其他并行流可能会导致死锁.
对我来说,这是一个意外的行为.
说明:由于Java流使用ForkJoin池,因此内部forEach正在分叉,并且连接似乎正在等待.但是,这种行为仍然是出乎意料的.请注意,如果设置"java.util.concurrent.ForkJoinPool.common.parallelism"为1 ,并行流甚至可以工作.
另请注意,如果存在内部并行forEach,则它可能不透明.
问题: 这种行为是否符合Java 8规范(在这种情况下,它意味着禁止在并行流工作者中使用信号量)或者这是一个错误?
为方便起见:下面是一个完整的测试用例.除了"true,true"之外,两个布尔值的任何组合都有效,这会导致死锁.
澄清:为了明确这一点,让我强调一个方面:acquire信号量不会发生死锁.请注意,代码包含
如果该段代码使用另一个并行流,则死锁发生在2. 然后在OTHER流内发生死锁.因此,似乎不允许一起使用嵌套并行流和阻塞操作(如信号量)!
请注意,记录并行流使用ForkJoinPool并且ForkJoinPool和Semaphore属于同一个包 - java.util.concurrent(因此可以预期它们可以很好地互操作).
/*
* (c) Copyright Christian P. Fries, …Run Code Online (Sandbox Code Playgroud) 我有一Record节课:
public class Record implements Comparable<Record>
{
private String myCategory1;
private int myCategory2;
private String myCategory3;
private String myCategory4;
private int myValue1;
private double myValue2;
public Record(String category1, int category2, String category3, String category4,
int value1, double value2)
{
myCategory1 = category1;
myCategory2 = category2;
myCategory3 = category3;
myCategory4 = category4;
myValue1 = value1;
myValue2 = value2;
}
// Getters here
}
Run Code Online (Sandbox Code Playgroud)
我创建了很多记录的大清单.仅第二和第五值,i / 10000并且i,将在后面使用的,由吸气剂getCategory2()和getValue1()分别.
List<Record> list = new ArrayList<>(); …Run Code Online (Sandbox Code Playgroud) 我有要并行处理的元素集合.当我使用a时List,并行性有效.但是,当我使用a时Set,它并不是并行运行的.
我写了一个显示问题的代码示例:
public static void main(String[] args) {
ParallelTest test = new ParallelTest();
List<Integer> list = Arrays.asList(1,2);
Set<Integer> set = new HashSet<>(list);
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
System.out.println("set print");
try {
forkJoinPool.submit(() ->
set.parallelStream().forEach(test::print)
).get();
} catch (Exception e) {
return;
}
System.out.println("\n\nlist print");
try {
forkJoinPool.submit(() ->
list.parallelStream().forEach(test::print)
).get();
} catch (Exception e) {
return;
}
}
private void print(int i){
System.out.println("start: " + i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("end: …Run Code Online (Sandbox Code Playgroud) CompletableFuture::supplyAsync(() -> IO bound queries)
如何为CompletableFuture :: supplyAsync选择Executor以避免污染ForkJoinPool.commonPool().
有许多选项Executors(newCachedThreadPool,newWorkStealingPool,newFixedThreadPool等)
我在这里阅读了关于新ForkJoinPool的内容
如何为我的用例选择合适的?
java executorservice java-8 threadpoolexecutor completable-future
所以我知道,如果你使用parallelStream没有自定义ForkJoinPool,它将使用默认的ForkJoinPool,默认情况下,只有一个线程,因为你有处理器.
因此,如此处所述(以及该问题的另一个答案)为了获得更多的并行性,您必须:
将并行流执行提交给您自己的ForkJoinPool:yourFJP.submit(() - > stream.parallel().forEach(doSomething));
所以,我这样做了:
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool(1000);
IntStream stream = IntStream.range(0, 999999);
final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());
forkJoinPool.submit(() -> {
stream.parallel().forEach(n -> {
System.out.println("Processing n: " + n);
try {
Thread.sleep(500);
thNames.add(Thread.currentThread().getName());
System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
} catch (Exception e) { …Run Code Online (Sandbox Code Playgroud) 我已经确定使用并行流确实比我的数据集的串行流更快.话虽如此,我想知道在这个问题中讨论使用的ForkJoinPool:Java 8并行流中的自定义线程池.
鉴于,
void foo()
{
barCollection.parallelStream() … do something with the stream
}
Run Code Online (Sandbox Code Playgroud)
相对于哪个池将使用1和2以下?
1)
ForkJoinPool.commonPool().submit(()->foo()).get();
Run Code Online (Sandbox Code Playgroud)
2)
foo();
Run Code Online (Sandbox Code Playgroud)
如果答案是肯定的,那么为什么该ForkJoinPol.commonPool()方法存在?
java ×9
java-8 ×9
java-stream ×6
concurrency ×2
fork-join ×1
forkjoinpool ×1
lambda ×1
observable ×1
rx-java ×1
sorting ×1