Rag*_*rni 11 scala apache-spark
我已经创建了两个来自Hive表(PC_ITM和ITEM_SELL)并且大小很大的数据帧,我通过注册表来经常在SQL查询中使用它们.但是因为它们很大,所以需要花费很多时间来获取查询结果.所以我把它们保存为镶木地板文件,然后读取它们并注册为临时表.但是我仍然没有得到良好的性能,所以我播放了那些数据帧,然后注册为如下表格.
PC_ITM_DF=sqlContext.parquetFile("path")
val PC_ITM_BC=sc.broadcast(PC_ITM_DF)
val PC_ITM_DF1=PC_ITM_BC
PC_ITM_DF1.registerAsTempTable("PC_ITM")
ITM_SELL_DF=sqlContext.parquetFile("path")
val ITM_SELL_BC=sc.broadcast(ITM_SELL_DF)
val ITM_SELL_DF1=ITM_SELL_BC.value
ITM_SELL_DF1.registerAsTempTable(ITM_SELL)
sqlContext.sql("JOIN Query").show
Run Code Online (Sandbox Code Playgroud)
但是我仍然无法实现性能,因为这些数据帧没有被广播.
任何人都可以判断这是否是正确的广播和使用方法?`
Kir*_*rst 19
你真的不需要"访问"广播数据帧 - 你只需要使用它,而Spark将在幕后实现广播.该广播功能很好地工作,并且更有意义的sc.broadcast做法.
如果您一次评估所有内容,可能很难理解花费的时间.
您可以将代码分解为多个步骤.这里的关键是执行操作并在您在连接中使用它们之前保留要广播的数据帧.
// load your dataframe
PC_ITM_DF=sqlContext.parquetFile("path")
// mark this dataframe to be stored in memory once evaluated
PC_ITM_DF.persist()
// mark this dataframe to be broadcast
broadcast(PC_ITM_DF)
// perform an action to force the evaluation
PC_ITM_DF.count()
Run Code Online (Sandbox Code Playgroud)
这样做可以确保数据帧
现在运行时,sqlContext.sql("JOIN Query").show您应该在Spark UI的SQL选项卡中看到"广播散列连接".
我会将 rdd 缓存在内存中。下次需要时,spark 将从内存中读取 RDD,而不是每次都从头生成 RDD。这是快速入门文档的链接。
val PC_ITM_DF = sqlContext.parquetFile("path")
PC_ITM_DF.cache()
PC_ITM_DF.registerAsTempTable("PC_ITM")
val ITM_SELL_DF=sqlContext.parquetFile("path")
ITM_SELL_DF.cache()
ITM_SELL_DF.registerAsTempTable("ITM_SELL")
sqlContext.sql("JOIN Query").show
Run Code Online (Sandbox Code Playgroud)
rdd.cache() 是 的简写rdd.persist(StorageLevel.MEMORY_ONLY)。如果您的数据对于仅内存持久性来说太大,您可以选择几个级别的持久性。以下是持久性选项的列表。如果你想手动从缓存中删除 RDD,你可以调用rdd.unpersist().
如果您喜欢广播数据。您必须先在驱动程序上收集它,然后才能广播它。这要求您的 RDD 适合驱动程序(和执行器)的内存。
| 归档时间: |
|
| 查看次数: |
14586 次 |
| 最近记录: |