如何在Spark中访问广播的DataFrame

Rag*_*rni 11 scala apache-spark

我已经创建了两个来自Hive表(PC_ITM和ITEM_SELL)并且大小很大的数据帧,我通过注册表来经常在SQL查询中使用它们.但是因为它们很大,所以需要花费很多时间来获取查询结果.所以我把它们保存为镶木地板文件,然后读取它们并注册为临时表.但是我仍然没有得到良好的性能,所以我播放了那些数据帧,然后注册为如下表格.

PC_ITM_DF=sqlContext.parquetFile("path")
val PC_ITM_BC=sc.broadcast(PC_ITM_DF)
val PC_ITM_DF1=PC_ITM_BC
PC_ITM_DF1.registerAsTempTable("PC_ITM")

ITM_SELL_DF=sqlContext.parquetFile("path")
val ITM_SELL_BC=sc.broadcast(ITM_SELL_DF)
val ITM_SELL_DF1=ITM_SELL_BC.value
ITM_SELL_DF1.registerAsTempTable(ITM_SELL)


sqlContext.sql("JOIN Query").show
Run Code Online (Sandbox Code Playgroud)

但是我仍然无法实现性能,因为这些数据帧没有被广播.

任何人都可以判断这是否是正确的广播和使用方法?`

Kir*_*rst 19

你真的不需要"访问"广播数据帧 - 你只需要使用它,而Spark将在幕后实现广播.该广播功能很好地工作,并且更有意义的sc.broadcast做法.

如果您一次评估所有内容,可能很难理解花费的时间.

您可以将代码分解为多个步骤.这里的关键是执行操作并您在连接中使用它们之前保留要广播的数据帧.

// load your dataframe
PC_ITM_DF=sqlContext.parquetFile("path")

// mark this dataframe to be stored in memory once evaluated
PC_ITM_DF.persist()

// mark this dataframe to be broadcast
broadcast(PC_ITM_DF)

// perform an action to force the evaluation
PC_ITM_DF.count()
Run Code Online (Sandbox Code Playgroud)

这样做可以确保数据帧

  • 加载到内存中(持久)
  • 注册为临时表,用于SQL查询
  • 标记为广播,因此将被发送给所有执行者

现在运行时,sqlContext.sql("JOIN Query").show您应该在Spark UI的SQL选项卡中看到"广播散列连接".

  • @AlexNaspo是的,我一直都在使用它.好处是数据在所有节点上完全可用 - 它不再分布 - 这使得在加入时提高了性能.例如,考虑一个包含美国每个人及其邮政编码的DataFrame,然后是一个包含邮政编码 - >州的表格.加入这些需要大量的改组.将相对较小的zip-> state数据帧广播到所有节点,消除了随机播放的需要. (4认同)
  • 广播RDD有什么好处?RDD代表弹性分布式数据集.广播消除了RDD的分布式特性.我可以看到将RDD中的数据收集到内存并进行广播的用例.我不相信这甚至是可能的.如果你看一下[文章](http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love -the-shuffle /)它说.."要广播一个RDD,你需要先在驱动程序节点上收集它()." 你有没有在实践或测试中使用它? (2认同)

Ale*_*spo 0

我会将 rdd 缓存在内存中。下次需要时,spark 将从内存中读取 RDD,而不是每次都从头生成 RDD。这是快速入门文档的链接。

val PC_ITM_DF = sqlContext.parquetFile("path")
PC_ITM_DF.cache()
PC_ITM_DF.registerAsTempTable("PC_ITM")

val ITM_SELL_DF=sqlContext.parquetFile("path")
ITM_SELL_DF.cache()
ITM_SELL_DF.registerAsTempTable("ITM_SELL")
sqlContext.sql("JOIN Query").show
Run Code Online (Sandbox Code Playgroud)

rdd.cache() 是 的简写rdd.persist(StorageLevel.MEMORY_ONLY)。如果您的数据对于仅内存持久性来说太大,您可以选择几个级别的持久性。以下是持久性选项的列表。如果你想手动从缓存中删除 RDD,你可以调用rdd.unpersist().

如果您喜欢广播数据。您必须先在驱动程序上收集它,然后才能广播它。这要求您的 RDD 适合驱动程序(和执行器)的内存。