sam*_*est 9 scala dataframe apache-spark apache-spark-sql
有没有办法(或任何计划)能够将Spark分布式集合(RDDs Dataframe或Datasets)直接转换为Broadcast变量而不需要collect?公共API似乎没有"开箱即用"的东西,但是可以在较低级别完成某些事情吗?
我可以想象这些操作有2倍的加速潜力(或更多?).为了解释我的意思,让我们通过一个例子来解释:
val myUberMap: Broadcast[Map[String, String]] =
sc.broadcast(myStringPairRdd.collect().toMap)
someOtherRdd.map(someCodeUsingTheUberMap)
Run Code Online (Sandbox Code Playgroud)
这会导致所有数据被收集到驱动程序,然后广播数据.这意味着数据通过网络发送两次.
什么会是这样的:
val myUberMap: Broadcast[Map[String, String]] =
myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)
someOtherRdd.map(someCodeUsingTheUberMap)
Run Code Online (Sandbox Code Playgroud)
在这里,Spark可以完全绕过收集数据,只是在节点之间移动数据.
奖金
此外,可能存在类似Monoid的API(有点像combineByKey),对于其中.toMap或者任何操作Array[T]都很昂贵但可以并行完成的情况.例如,构造某些Trie结构可能是昂贵的,这种功能可能导致算法设计的可怕范围.当IO运行时也可以运行此CPU活动 - 当前广播机制正在阻塞(即所有IO,然后是所有CPU,然后是所有IO).
澄清
在这里,连接不是(主要)用例,可以假设我稀疏地使用广播的数据结构.例如,键someOtherRdd不会覆盖键,myUberMap但我不知道我需要哪些键,直到我遍历someOtherRdd并假设我myUberMap多次使用.
我知道所有听起来都有点模糊,但重点是更一般的机器学习算法设计.
虽然理论上这是一个有趣的想法,但我认为虽然理论上可行,但它的实际应用非常有限.显然我不能代表PMC说话,所以我不能说是否有任何计划实施这种类型的广播机制.
可能的实施:
由于Spark已经提供了torrent广播机制,其行为描述如下:
驱动程序将序列化对象划分为小块,并将这些块存储
BlockManager在驱动程序中.在每个执行程序上,执行程序首先尝试从其中获取对象
BlockManager.如果它不存在,则它使用远程提取从驱动程序和/或其他执行程序(如果可用)中获取小块.一旦获得块,它就会将块放在自己的块中
BlockManager,准备好从其他执行器获取.
应该可以为直接节点到节点广播重用相同的机制.
值得注意的是,这种方法不能完全消除驱动程序通信.即使可以在本地创建块,您仍然需要一个单一的事实来源来通告一组块来获取.
应用有限
广播变量的一个问题是相当昂贵.即使您可以消除驱动程序瓶颈,仍然存在两个问题:
第一个问题应该是相对明显的.它不仅涉及直接内存使用,还涉及GC成本及其对总体延迟的影响.第二个是相当微妙的.我在回答" 为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢"的回答中部分地介绍了这一点,但让我们进一步讨论.
从网络流量角度来看,整个数据集与创建笛卡尔积非常相同.因此,如果数据集足够大以使驱动程序成为瓶颈,则它不太可能是广播的良好候选者,并且在实践中可以优选诸如散列连接的目标方法.
替代方案:
有一些方法可用于实现与上面列举的直接广播和地址问题类似的结果,包括: