br0*_*ipe 5 scala count apache-spark rdd spark-dataframe
我正在使用来自Join的一些记录创建一个新的DataFrame.
val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()
Run Code Online (Sandbox Code Playgroud)
除计数操作外,一切都很快(不到一秒).RDD转换开始并且完成需要几个小时.有什么方法可以加快速度吗?
INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
Run Code Online (Sandbox Code Playgroud)
Har*_*edi 17
除计数操作外,一切都很快(不到一秒).
这是有道理如下:在之前的所有操作count都称为转换和这种类型的火花操作都是懒惰的,即它不调用一个动作(前做任何计算count在你的例子).
第二个问题是repartition(1):
请记住,你将失去spark提供的所有并行性,你的计算将在一个执行器中运行(核心,如果你处于独立模式),所以你必须删除这一步或将1更改为数字命题到数字您的CPU核心(独立模式)或执行器数量(集群模式).
RDD转换开始并且完成需要几个小时.
如果我理解正确你会DataFrame转向一个RDD,这在火花中真的是一个不好的做法,你应尽可能避免这种转换.这是因为在数据DataFrame和Dataset使用的编码特殊的火花编码器,它需要很多的内存小于JVM的序列编码器(这就是所谓的tungstant如果我也记得吧),所以这种转换意味着火花将您的数据的类型,从他自己的改变一个(占用更少的内存,让火花通过仅编译数据来优化大量的换向,而不是将数据序列化,然后反序列化)到JVM数据类型,这就是为什么DataFrames和Datasets非常强大而不是RDDs
希望这对你有所帮助
| 归档时间: |
|
| 查看次数: |
10897 次 |
| 最近记录: |