在 Spark mapParitions 中使用 Java 8 parallelStream

Luc*_*ess 5 parallel-processing java-8 apache-spark spark-streaming

我试图了解 Spark 并行性中 Java 8 并行流的行为。当我运行下面的代码时,我期望的输出大小listOfThings与输入大小相同。但事实并非如此,我有时会在输出中丢失项目。这种行为是不一致的。如果我只是遍历迭代器而不是使用parallelStream,那么一切都很好。每次都计算匹配。

// listRDD.count = 10
JavaRDD test = listRDD.mapPartitions(iterator -> {
    List listOfThings = IteratorUtils.toList(iterator);
    return listOfThings.parallelStream.map(
        //some stuff here
    ).collect(Collectors.toList());
});
// test.count = 9
// test.count = 10
// test.count = 8
// test.count = 7
Run Code Online (Sandbox Code Playgroud)

Aka*_*thi 0

  1. 这是一个非常好的问题。
  2. 这是怎么回事Race Condition。当您并行化流时,然后将完整列表流拆分为几个相等的部分[基于可用线程和列表大小],然后它尝试在每个可用线程上独立运行子部分来执行工作。

但您还使用 apache Spark,它以更快地计算工作而闻名,即通用计算引擎。Spark 使用相同的方法[并行化工作]来执行操作。

现在,在这个场景中,发生的是 Spark 已经并行化了整个工作,然后在这个场景中,您再次并行化工作,因为竞争条件开始,即 Spark 执行器开始处理工作,然后您并行化工作,然后流进程获取其他线程并开始处理如果正在处理流工作的线程在 SPARK 执行器完成其工作之前完成工作,则它会添加结果,否则 SPARK 执行器将继续向 Master 报告结果。

  1. 这不是重新并行化工作的好方法,它总是会给您带来痛苦,让 Spark 为您做这件事。

希望你明白这里发生了什么

谢谢