我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).
原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者在写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)或repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.
关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.
首先,我将数据帧保存到内存中:
df.cache().count
Run Code Online (Sandbox Code Playgroud)
Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)
这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这又导致10'792'965'376字节的不同大小=〜10.8GB.
我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).
SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?
我在 Spark 集群配置和运行 Pyspark 管道方面非常有经验,但我才刚刚开始使用 Beam。因此,我正在尝试在 Spark PortableRunner(在同一个小型 Spark 集群上运行,4 个工作人员,每个工作人员具有 4 个内核和 8GB RAM)上的 Pyspark 和 Beam python SDK 之间进行逐个比较,并且我'已经决定为一个相当大的数据集进行 wordcount 作业,将结果存储在 Parquet 表中。
因此,我下载了 50GB 的 Wikipedia 文本文件,分成大约 100 个未压缩的文件,并将它们存储在目录中/mnt/nfs_drive/wiki_files/(/mnt/nfs_drive是安装在所有工作人员上的 NFS 驱动器)。
首先,我正在运行以下 Pyspark wordcount 脚本:
from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'
spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()
spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: Row(word=x[0], count=x[1]))
spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', …Run Code Online (Sandbox Code Playgroud) 我刚刚从 Spark 2.0.2 升级到 Spark 2.1.0(通过下载 Hadoop 2.7 及更高版本的预构建版本)。没有安装 Hive。
spark-shell 启动后,会在启动位置创建metastore_db/文件夹和derby.log文件,以及一堆警告日志(在以前的版本中没有打印出来)。
仔细检查调试日志显示 Spark 2.1.0 尝试初始化 a HiveMetastoreConnection:
17/01/13 09:14:44 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
Spark 2.0.2 的类似调试日志没有显示HiveMetastoreConnection.
这是预期的行为吗?是否与spark.sql.warehouse.dir现在会话之间共享的静态配置有关?我如何避免这种情况,因为我没有安装 Hive?
提前致谢!