vef*_*hym 6 java scala kryo apache-spark
我有一个JavaPairRDD<Integer, Integer[]>我想要执行的groupByKey操作.
这个groupByKey动作给了我一个:
org.apache.spark.shuffle.MetadataFetchFailedException:缺少shuffle的输出位置
如果我没有弄错的话,这实际上是一个OutOfMemory错误.这只发生在大数据集中(在我的情况下,当Web UI中显示的"Shuffle Write"为~96GB时).
我已经设定:
spark.serializer org.apache.spark.serializer.KryoSerializer
in $SPARK_HOME/conf/spark-defaults.conf,但我不确定Kryo是否用于序列化我的JavaPairRDD.
除了设置这个conf参数之外,我还应该做些什么才能使用Kryo序列化我的RDD?我可以在序列化说明中看到:
Spark自动包含Kryo序列化程序,用于来自Twitter chill库的AllScalaRegistrar中涵盖的许多常用核心Scala类.
然后:
从Spark 2.0.0开始,我们在使用简单类型,简单类型数组或字符串类型对RDD进行混洗时,内部使用Kryo序列化程序.
我还注意到,当我将spark.serializer设置为Kryo时,Web UI中的Shuffle Write从~96GB(默认序列化器)增加到243GB!
编辑:在评论中,我被问及我的程序的逻辑,以防groupByKey可以用reduceByKey替换.我不认为这是可能的,但无论如何它在这里:
输入具有以下形式:
shuffle write操作以以下形式生成对:
该groupByKey操作收集每个实体的所有邻居阵列,其中一些可能出现不止一次(在许多桶中).
在groupByKey操作之后,我为每个桶保留一个权重(基于它包含的负实体id的数量),并且对于每个邻居id,我总结它所属的桶的权重.
我将每个邻居id的分数标准化为另一个值(假设它已经给出)并且每个实体发出前3个邻居.
我得到的不同密钥的数量大约是1000万(大约500万个正实体ID和500万个负数).
EDIT2:我尝试分别使用Hadoop的Writables(VIntWritable和VIntArrayWritable扩展ArrayWritable)而不是Integer和Integer [],但是shuffle大小仍然比默认的JavaSerializer大.
然后我spark.shuffle.memoryFraction将从0.2增加到0.4(即使在版本2.1.0中已弃用,也没有应该使用的内容的描述)并启用了offHeap内存,并且随机播放的大小减少了~20GB.即使这符合标题所要求的内容,我更倾向于使用更算法的解决方案,或者包含更好压缩的解决方案.
简短回答:使用fastutil并可能增加spark.shuffle.memoryFraction.
更多细节:
这个 RDD 的问题是 Java 需要存储Object引用,这比原始类型消耗更多的空间。在此示例中,我需要存储Integers,而不是int值。JavaInteger占用 16 个字节,而原始 Javaint占用 4 个字节。Int另一方面,Scala 的类型是 32 位(4 字节)类型,就像 Java 的类型一样int,这就是为什么使用 Scala 的人可能不会遇到类似的情况。
除了将 增加到spark.shuffle.memoryFraction0.4 之外,另一个不错的解决方案是使用fastutil 库,如Spark 调优文档中的建议:
减少内存消耗的第一个方法是避免增加开销的 Java 功能,例如基于指针的数据结构和包装对象。有几种方法可以做到这一点: 设计数据结构时首选对象数组和基本类型,而不是标准 Java 或 Scala 集合类(例如 HashMap)。fastutil 库为与 Java 标准库兼容的原始类型提供了方便的集合类。
这使得我能够将 RDD 对的 int 数组中的每个元素存储为一种int类型(即,对于数组的每个元素使用 4 个字节而不是 16 个字节)。就我而言,我使用IntArrayList而不是Integer[]. 这使得 shuffle 大小显着下降,并允许我的程序在集群中运行。我还在代码的其他部分使用了这个库,在那里我制作了一些临时的 Map 结构。总体而言,通过增加到spark.shuffle.memoryFraction0.4 并使用 fastutil 库,使用默认 Java 序列化器(不是 Kryo)时,shuffle 大小从 96GB 下降到 50GB(!)。
替代方案:我还尝试对 rdd 对的每个 int 数组进行排序,并使用 Hadoop 的 VIntArrayWritable 类型存储增量(较小的数字比较大的数字使用更少的空间),但这也需要在 Kryo 中注册 VIntWritable 和 VIntArrayWritable,这并没有保存毕竟任何空间。总的来说,我认为 Kryo 只会让事情运行得更快,但不会减少所需的空间,但我仍然不确定这一点。
我还没有将此答案标记为已接受,因为其他人可能有更好的想法,而且因为我毕竟没有像我的 OP 所要求的那样使用 Kryo。我希望阅读它能够帮助其他遇到同样问题的人。如果我设法进一步减小随机播放大小,我将更新此答案。
| 归档时间: |
|
| 查看次数: |
1257 次 |
| 最近记录: |