相关疑难解决方法(0)

Java 8流和RxJava可观察量之间的差异

Java 8流是否类似于RxJava observables?

Java 8流定义:

java.util.stream包中的类提供Stream API以支持对元素流的功能样式操作.

observable java-8 rx-java java-stream

137
推荐指数
5
解决办法
4万
查看次数

在Java 8中并行生成了多少个线程?

在JDK8中,当我使用parallelStream时会产生多少个线程?例如,在代码中:

list.parallelStream().forEach(/** Do Something */);
Run Code Online (Sandbox Code Playgroud)

如果此列表包含100000个项目,将生成多少个线程?

另外,每个线程都可以获得相同数量的项目,还是随机分配?

java multithreading java.util.concurrent java-8

59
推荐指数
2
解决办法
2万
查看次数

有没有理由不使用Java 8的parallelSort?

我正在阅读有关Java 和的区别的问题,这已经有几年了。令我惊讶的是,只有一个问题提到了使用的任何缺点; 也就是说,如果您使用大量CPU,则速度会降低。Arrays.sortArrays.parallelSortparallelSort

假设您不在某种专门的单线程环境中,应该总是选择一个parallelSort吗?有没有理由不这样做?请注意,上述问题的答案之一是,如果少于4096个元素,则无论如何都会parallelSort调用sort

java sorting parallel-processing

52
推荐指数
3
解决办法
3332
查看次数

官方文档在哪里说Java的并行流操作使用fork/join?

以下是我对Java 8 的Stream框架的理解:

  1. 东西产生了源
  2. 该实现负责提供BaseStream#parallel()方法,该方法依次返回可以并行运行其操作的Stream.

虽然有人已经找到了一种方法来使用自定义线程池和Stream框架的并行执行,但我不能在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容.(Collection#parallelStream(),StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源).

我只能搜索搜索结果的花絮是:


所以我的问题是:

在哪里说ForkJoinPool#commonPool()用于对从Java 8 API获得的流进行并行操作?

java fork-join java-8 forkjoinpool java-stream

24
推荐指数
2
解决办法
6295
查看次数

在嵌套的Java 8并行流动作中使用信号量可能是DEADLOCK.这是一个错误吗?

考虑以下情况:我们使用Java 8并行流来执行并行forEach循环,例如,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})
Run Code Online (Sandbox Code Playgroud)

并行线程的数量由系统属性"java.util.concurrent.ForkJoinPool.common.parallelism"控制,通常等于处理器的数量.

现在假设我们想限制特定工作的并行执行次数 - 例如因为该部分是内存密集型而内存约束意味着并行执行的限制.

限制并行执行的一种明显而优雅的方法是使用信号量(这里建议),例如,下面的代码片段将并行执行的数量限制为5:

        final Semaphore concurrentExecutions = new Semaphore(5);
        IntStream.range(0,20).parallel().forEach(i -> {

            concurrentExecutions.acquireUninterruptibly();

            try {
                /* WORK DONE HERE */
            }
            finally {
                concurrentExecutions.release();
            }
        });
Run Code Online (Sandbox Code Playgroud)

这很好用!

但是:在worker(at /* WORK DONE HERE */)中使用任何其他并行流可能会导致死锁.

对我来说,这是一个意外的行为.

说明:由于Java流使用ForkJoin池,因此内部forEach正在分叉,并且连接似乎正在等待.但是,这种行为仍然是出乎意料的.请注意,如果设置"java.util.concurrent.ForkJoinPool.common.parallelism"为1 ,并行流甚至可以工作.

另请注意,如果存在内部并行forEach,则它可能不透明.

问题: 这种行为是否符合Java 8规范(在这种情况下,它意味着禁止在并行流工作者中使用信号量)或者这是一个错误?

为方便起见:下面是一个完整的测试用例.除了"true,true"之外,两个布尔值的任何组合都有效,这会导致死锁.

澄清:为了明确这一点,让我强调一个方面:acquire信号量不会发生死锁.请注意,代码包含

  1. 获得信号量
  2. 运行一些代码
  3. 释放信号量

如果该段代码使用另一个并行流,则死锁发生在2. 然后在OTHER流内发生死锁.因此,似乎不允许一起使用嵌套并行流和阻塞操作(如信号量)!

请注意,记录并行流使用ForkJoinPool并且ForkJoinPool和Semaphore属于同一个包 - java.util.concurrent(因此可以预期它们可以很好地互操作).

/*
 * (c) Copyright Christian P. Fries, …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing concurrency java-8 java-stream

21
推荐指数
2
解决办法
5721
查看次数

排序并行流时遇到Encounter错误

我有一Record节课:

public class Record implements Comparable<Record>
{
   private String myCategory1;
   private int    myCategory2;
   private String myCategory3;
   private String myCategory4;
   private int    myValue1;
   private double myValue2;

   public Record(String category1, int category2, String category3, String category4,
      int value1, double value2)
   {
      myCategory1 = category1;
      myCategory2 = category2;
      myCategory3 = category3;
      myCategory4 = category4;
      myValue1 = value1;
      myValue2 = value2;
   }

   // Getters here
}
Run Code Online (Sandbox Code Playgroud)

我创建了很多记录的大清单.仅第二和第五值,i / 10000并且i,将在后面使用的,由吸气剂getCategory2()getValue1()分别.

List<Record> list = new ArrayList<>(); …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing java-8 java-stream

21
推荐指数
1
解决办法
1562
查看次数

来自HashSet的并行流不会并行运行

我有要并行处理的元素集合.当我使用a时List,并行性有效.但是,当我使用a时Set,它并不是并行运行的.

我写了一个显示问题的代码示例:

public static void main(String[] args) {
    ParallelTest test = new ParallelTest();

    List<Integer> list = Arrays.asList(1,2);
    Set<Integer> set = new HashSet<>(list);

    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    System.out.println("set print");
    try {
        forkJoinPool.submit(() ->
            set.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }

    System.out.println("\n\nlist print");
    try {
        forkJoinPool.submit(() ->
            list.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }   
}

private void print(int i){
    System.out.println("start: " + i);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    System.out.println("end: …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing lambda java-8 java-stream

20
推荐指数
1
解决办法
5992
查看次数

如何为CompletableFuture :: supplyAsync选择Executor

CompletableFuture::supplyAsync(() -> IO bound queries)

如何为CompletableFuture :: supplyAsync选择Executor以避免污染ForkJoinPool.commonPool().

有许多选项Executors(newCachedThreadPool,newWorkStealingPool,newFixedThreadPool等)

在这里阅读了关于新ForkJoinPool的内容

如何为我的用例选择合适的?

java executorservice java-8 threadpoolexecutor completable-future

14
推荐指数
1
解决办法
4726
查看次数

为什么并行流不使用ForkJoinPool的所有线程?

所以我知道,如果你使用parallelStream没有自定义ForkJoinPool,它将使用默认的ForkJoinPool,默认情况下,只有一个线程,因为你有处理器.

因此,如此处所述(以及该问题的另一个答案)为了获得更多的并行性,您必须:

将并行流执行提交给您自己的ForkJoinPool:yourFJP.submit(() - > stream.parallel().forEach(doSomething));

所以,我这样做了:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) { …
Run Code Online (Sandbox Code Playgroud)

java concurrency multithreading java-8 java-stream

14
推荐指数
2
解决办法
2万
查看次数

ForkJoinPool.commonPool()相当于没有池吗?

我已经确定使用并行流确实比我的数据集的串行流更快.话虽如此,我想知道在这个问题中讨论使用的ForkJoinPool:Java 8并行流中的自定义线程池.

鉴于,

void foo()
{
     barCollection.parallelStream()  … do something with the stream
}
Run Code Online (Sandbox Code Playgroud)

相对于哪个池将使用1和2以下?

1)

ForkJoinPool.commonPool().submit(()->foo()).get();
Run Code Online (Sandbox Code Playgroud)

2)

foo();
Run Code Online (Sandbox Code Playgroud)

如果答案是肯定的,那么为什么该ForkJoinPol.commonPool()方法存在?

java multithreading java-8

10
推荐指数
1
解决办法
3586
查看次数