das*_*555 5 join apache-spark apache-spark-sql pyspark
我了解广播优化的概念。
当连接中的一侧数据较小时,最好只针对较小的一侧进行洗牌。
但为什么不能只使用执行器来进行这种洗牌呢?为什么我们需要使用驱动程序?
如果每个执行器都保存哈希表来映射执行器之间的记录,我认为它应该可以工作。
在 Spark 广播的当前实现中 - 它将数据收集到驱动程序,然后对其进行洗牌,并且对驱动程序的收集操作是我想避免的瓶颈。
关于如何在没有驱动程序内存瓶颈的情况下实现类似的优化有什么想法吗?
您是对的,当前的实现需要将数据收集到驱动程序,然后再将其发送到执行器。
已经有一个 JIRA 票证SPARK-17556完全解决了您的建议:
“目前在 Spark SQL 中,为了执行广播连接,驱动程序必须收集 RDD 的结果,然后广播它。这会带来一些额外的延迟。也许可以直接从执行器广播。”
我已从附加文档中复制了建议的解决方案,以使该答案具有自我描述性:
“要向 RDD 添加广播方法以从执行器执行广播,我们需要一些支持工作,如下所示:
- 从驱动程序构造 BroadCastId,BroadCastManager 将提供一个方法来执行此操作。
// Called from driver to create new broadcast id
def newBroadcastId: Long = nextBroadcastId.getAndIncrement()
Run Code Online (Sandbox Code Playgroud)
BroadCastManager可以创建一个具有指定id和持久标签的广播,以推断该广播是执行器广播,并且其数据将备份在hdfs上。
在TorrentBroadcast.writeBlocks中将块写入hdfs,readBlocks按优先级从本地、远程、hdfs读取块。
在构造Broadcast时,我们可以控制是否上传广播数据块
BroadCastManager 发布一个 api 将广播数据放入块管理器
归档时间: |
|
查看次数: |
996 次 |
最近记录: |