Spark如何并行处理1TB文件?

Pol*_*ase 9 parallel-processing dataframe apache-spark apache-spark-sql

想象中的问题

  • 一个巨大的CSV日志文件,比方说大小为1 TB,该文件位于USB驱动器上
  • 该日志包含世界各地用户的活动日志,我们假设该行包含50列,其中包含Country.
  • 我们希望每个国家/地区的行数按降序排列.
  • 让我们假设Spark集群有足够的RAM节点来处理内存中的整个1TB(20个节点,4个核心CPU,每个节点有64GB RAM)

我的穷人的概念解决方案 使用SparkSQLDatabricks spark-csv

$ ./spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
Run Code Online (Sandbox Code Playgroud)

问题1:Spark如何并行化处理?

我想上述解决方案的大部分执行时间(99%?)是将1TB文件从USB驱动器读入Spark集群.从USB驱动器读取文件是不可并行化的.但是在阅读完整个文件之后,Spark会做些什么来并行处理?

  • 用于创建DataFrame的节点数量是多少?(也许只有一个?)

  • GroupBy&Count使用了多少个节点?我们假设有100多个国家(但Spark还不知道).Spark如何分区以在20个节点上分发100多个国家/地区值?

问题2:如何使Spark应用程序尽可能快? 我想改进的方面是并行化1TB文件的读取.

  • 使用Snappy压缩将CSV文件转换为Parquet文件格式+.我们假设这可以提前完成.

  • 复制HDFS上的Parquet文件.假设Spark集群位于同一个Hadoop集群中,并且数据节点与20个节点Spark集群无关.

  • 将Spark应用程序更改为从HDFS读取.我想Spark现在会使用几个节点来读取文件,因为Parquet是可拆分的.

  • 让我们假设Snappy压缩的Parquet文件小10倍,大小= 100GB,HDFS块大小= 128 MB.共有782个HDFS块.

但那么Spark如何设法使用所有20个节点来创建DataFrame和处理(GroupBy和Count)?Spark每次都使用所有节点吗?

maa*_*asg 5

问题1:Spark如何并行处理(从USB驱动器读取文件)的处理?

这种情况是不可能的。

Spark依赖于hadoop兼容的文件系统来读取文件。挂载USB驱动器时,只能从本地主机访问它。尝试执行

.load("/media/username/myUSBdrive/bogusBigLog1TB.log")
Run Code Online (Sandbox Code Playgroud)

将在集群配置中失败,因为集群中的执行者将无法访问该本地路径。

可以使用本地模式(master=local[*])的Spark读取文件,在这种情况下,您只有1个主机,因此其余问题将不适用。

问题2:如何使Spark应用程序尽可能快?

分而治之。
问题中概述的策略是好的。使用Parquet将允许Spark在数据和仅.select("Country")列上进行投影,从而进一步减少了需要提取的数据量,从而加快了处理速度。

Spark中并行性的基石是分区。同样,在读取文件时,Spark依赖于Hadoop文件系统。从HDFS读取时,分区将取决于HDFS上文件的拆分。这些拆分将平均分配给执行者。这就是Spark最初将工作分配给该工作的所有可用执行者的方式。

我对Catalist优化并不十分熟悉,但是我想我可以假设它.groupBy("Country").agg(count($"Country")会类似于以下内容:rdd.map(country => (country,1)).reduceByKey(_+_) 映射操作不会影响分区,因此可以在现场进行应用。reduceByKey将首先在每个分区上本地应用,部分结果将在驱动程序上合并。因此,大多数计数发生在群集中,并且将其集中化。


zer*_*323 3

从 USB 驱动器读取文件是不可并行的。

USB 驱动器或任何其他数据源也适用相同的规则。任何一个源都可以从驱动程序访问,并且所有工作计算机和数据都是并行访问的(直到源限制),或者根本不访问数据,您会遇到异常。

有多少个节点用于创建 DataFrame?(也许只有一个?)

假设可以从所有计算机访问文件,这取决于配置。首先,您应该查看拆分大小。

有多少个节点用于 GroupBy 和 Count?

这再次取决于配置。