在并行Java流中处理随机数

jan*_*nko 4 java parallel-processing java-stream

我想从0到50范围内生成5个不同的随机数,然后并行执行一些操作.当我写这个程序时,程序永远不会结束:

new Random().ints(0, 50)
            .distinct()
            .limit(5)
            .parallel()
            .forEach(d -> System.out.println("s: " + d));
Run Code Online (Sandbox Code Playgroud)

我试图使用peek调试它.我有无限的c:行数,50 d:行,但零l:s:行:

new Random().ints(0, 50)
            .peek(d -> System.out.println("c: " + d))
            .distinct()
            .peek(d -> System.out.println("d: " + d))
            .limit(5)
            .peek(d -> System.out.println("l: " + d))
            .parallel()
            .forEach(d -> System.out.println("s: " + d));
Run Code Online (Sandbox Code Playgroud)

我的实施有什么问题?

Tag*_*eev 5

首先,请注意.parallel()改变整个管道的并行状态,因此它影响所有操作,而不仅仅是后续操作.在你的情况下

new Random().ints(0, 50)
            .distinct()
            .limit(5)
            .parallel()
            .forEach(d -> System.out.println("s: " + d));
Run Code Online (Sandbox Code Playgroud)

是相同的

new Random().ints(0, 50)
            .parallel()
            .distinct()
            .limit(5)
            .forEach(d -> System.out.println("s: " + d));
Run Code Online (Sandbox Code Playgroud)

您不能仅并行化部分管道.它是平行还是不平行.

现在回到你的问题.由于Random.ints是一个无序流的无序实现distinctlimit被选择,所以它不是一个重复这个问题(这里的问题是在有序的不同实现).这里的问题在于无序limit()实现.为了减少可能的争用它不检查在不同的线程中发现的元素的总数,直到每个子任务得到至少128个元件或上游耗尽(见实施,1 << 7 = 128).在你的情况下,上游distinct()发现只有50个不同的元素,拼命遍历输入,希望找到更多,但下游limit()不发出信号停止处理,因为它想要在检查是否达到限制之前收集至少128个元素(不是很聪明,因为限制小于128).所以要使这个东西工作,你应该至少选择(128*个CPU数量)不同的元素.在我的4核机器使用new Random().ints(0, 512)成功而new Random().ints(0, 511)卡住.

为了解决这个问题,我建议按顺序收集随机数并在那里创建一个新流:

int[] ints = new Random().ints(0, 50).distinct().limit(5).toArray();
Arrays.stream(ints).parallel()
      .forEach(d -> System.out.println("s: " + d));
Run Code Online (Sandbox Code Playgroud)

我假设你想要执行一些昂贵的下游处理.在这种情况下,并行生成5个随机数并不是很有用.顺序执行时,此部分将更快.

更新:提交了错误报告并提交了补丁.