小编hir*_*ryu的帖子

计算Spark数据帧的大小 - SizeEstimator会产生意外结果

我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).

原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.

关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.

首先,我将数据帧保存到内存中:

df.cache().count 
Run Code Online (Sandbox Code Playgroud)

Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:

import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)

这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:

df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)

这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:

df.mapPartitions(
    iterator => Seq(SizeEstimator.estimate(
        iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)

这又导致10'792'965'376字节的不同大小=〜10.8GB.

我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).

SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?

apache-spark spark-dataframe

10
推荐指数
3
解决办法
1万
查看次数

使用 Python SDK 在 Spark 上运行 Apache Beam wordcount 管道时并行度低

我在 Spark 集群配置和运行 Pyspark 管道方面非常有经验,但我才刚刚开始使用 Beam。因此,我正在尝试在 Spark PortableRunner(在同一个小型 Spark 集群上运行,4 个工作人员,每个工作人员具有 4 个内核和 8GB RAM)上的 Pyspark 和 Beam python SDK 之间进行逐个比较,并且我'已经决定为一个相当大的数据集进行 wordcount 作业,将结果存储在 Parquet 表中。

因此,我下载了 50GB 的 Wikipedia 文本文件,分成大约 100 个未压缩的文件,并将它们存储在目录中/mnt/nfs_drive/wiki_files//mnt/nfs_drive是安装在所有工作人员上的 NFS 驱动器)。

首先,我正在运行以下 Pyspark wordcount 脚本:

from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'

spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()

spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add) \
    .map(lambda x: Row(word=x[0], count=x[1]))

spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-beam

6
推荐指数
1
解决办法
483
查看次数

预构建 Spark 2.1.0 在启动 spark-shell 时创建 metastore_db 文件夹和 derby.log

我刚刚从 Spark 2.0.2 升级到 Spark 2.1.0(通过下载 Hadoop 2.7 及更高版本的预构建版本)。没有安装 Hive。

spark-shell 启动后,会在启动位置创建metastore_db/文件夹和derby.log文件,以及一堆警告日志(在以前的版本中没有打印出来)。

仔细检查调试日志显示 Spark 2.1.0 尝试初始化 a HiveMetastoreConnection

17/01/13 09:14:44 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.

Spark 2.0.2 的类似调试日志没有显示HiveMetastoreConnection.

这是预期的行为吗?是否与spark.sql.warehouse.dir现在会话之间共享的静态配置有关?我如何避免这种情况,因为我没有安装 Hive?

提前致谢!

apache-spark apache-spark-2.0

1
推荐指数
2
解决办法
5836
查看次数