Spark在内存中执行TB文件

yog*_*oga 3 hadoop apache-spark pyspark

我们假设我有一个Tb数据文件.十个节点集群中的每个节点内存为3GB.

我想用spark处理文件.但One TeraByte如何适应记忆?

它会丢失内存异常吗?

它是如何工作的?

sgv*_*gvd 6

正如Thilo所提到的,Spark不需要在内存中加载所有内容以便能够处理它.这是因为Spark会将数据划分为更小的块并分别对这些块进行操作.分区数量,这个大小取决于几个方面:

  • 存储文件的位置.Spark最常用的选项已经将文件存储在一堆块中,而不是作为单个大块数据存储.例如,如果它存储在HDFS中,则默认情况下这些块为64MB,并且块在节点之间分布(和复制).对于存储在S3中的文件,您将获得32MB的块.这是由Hadoop定义的,Hadoop FileSystem用于读取文件并应用于其他文件系统的文件,Spark使用Hadoop来处理这些文件.
  • 你做的任何重新分配.您可以调用repartition(N)coalesce(N)在RDD或DataFrame上调整分区数量,从而更改分区大小.coalesce对于在不跨节点的数据进行混洗的情况下减少数量是首选,而repartition允许您指定一种新的方式来分割数据,即更好地控制在同一节点上处理数据的哪些部分.
  • 您可以进行一些更高级的转换.例如,配置设置spark.sql.shuffle.partitions(默认设置为200)确定DataFrame连接和聚合产生的分区数

高速缓存

前面的内容只涉及Spark中数据的标准处理,但我觉得你可能会因为Spark被称为"内存中"而错过了想法,所以我想解决这个问题.默认情况下,Spark中没有任何内容比任何其他数据处理工具更"内存":sc.textFile(foo).map(mapFunc).saveTextFile(bar)读取文件(逐块并分布在节点上)的简单示例,内存中的映射(如任何计算机程序) )然后再将其保存到存储器中.Spark对内存的使用在下面变得更加有趣(在Scala中,因为我对它更熟悉,但在Python中概念和方法名称完全相同):

val rdd = sc.textFile(foo)
// Do some preprocessing, such as parsing lines
val preprocessed = rdd.map(preprocessFunc)
// Tell Spark to cache preprocessed data (by default in memory)
preprocessed.cache()
// Perform some mapping and save output
preprocessed.map(mapFunc1).saveTextFile(outFile1)
// Perform a different mapping and save somewhere else
preprocessed.map(mapFunc2).saveTextFile(outFile2)
Run Code Online (Sandbox Code Playgroud)

这里的想法是使用cache()这样的预处理不必两次(可能); 默认情况下,Spark不会保存任何中间结果,但会计算每个单独操作的完整链,其中"操作"是saveTextFile调用.

我说"可能"因为实际缓存数据的能力受到节点内存的限制.Spark为缓存存储保留了一定的内存,与工作内存分开(参见http://spark.apache.org/docs/latest/configuration.html#memory-management如何管理这些内存部分的大小),并且只能缓存该数量可以容纳的数量.

根据您的分区,它可能会更少.假设您的3个节点中每个节点都有2GB的存储空间,数据preprocessed为6GB.如果此数据有3个分区,则它将完全适合,并且所有输入数据mapFunc2将从内存加载.但是如果你说有4个分区,每个分区1.5Gb,每个节点上只能缓存1个分区; 第四个分区将不适合仍然留在每台机器上的0.5GB,因此必须为第二个映射重新计算此分区,并且只有3/4的预处理数据将从内存中读取.

因此,从这个意义上来说,最好有许多小分区,以尽可能提高缓存效率,但这可能还有其他缺点:如果您碰巧使用具有细粒度模式的Mesos,以及大量小输出文件,则会产生更多开销,巨大延迟(如果在保存之前没有合并),Spark会将每个分区保存为单独的文件.

正如Durga所提到的那样,也有可能将数据不适合内存泄漏到磁盘,你可以按照他的链接:)


Dur*_*aju 1

默认情况下,存储级别是MEMORY_ONLY,它将尝试将数据放入内存中。如果数据无法装入内存,它将因内存不足问题而失败。

它支持其他存储级别,例如MEMORY_AND_DISK、DISK_ONLY等。您可以通过Spark文档了解不同的存储级别。您可以在 RDD 上调用 persist 函数来使用不同的存储级别。