yog*_*oga 3 hadoop apache-spark pyspark
我们假设我有一个Tb数据文件.十个节点集群中的每个节点内存为3GB.
我想用spark处理文件.但One TeraByte如何适应记忆?
它会丢失内存异常吗?
它是如何工作的?
正如Thilo所提到的,Spark不需要在内存中加载所有内容以便能够处理它.这是因为Spark会将数据划分为更小的块并分别对这些块进行操作.分区数量,这个大小取决于几个方面:
FileSystem用于读取文件并应用于其他文件系统的文件,Spark使用Hadoop来处理这些文件.repartition(N)或coalesce(N)在RDD或DataFrame上调整分区数量,从而更改分区大小.coalesce对于在不跨节点的数据进行混洗的情况下减少数量是首选,而repartition允许您指定一种新的方式来分割数据,即更好地控制在同一节点上处理数据的哪些部分.spark.sql.shuffle.partitions(默认设置为200)确定DataFrame连接和聚合产生的分区数前面的内容只涉及Spark中数据的标准处理,但我觉得你可能会因为Spark被称为"内存中"而错过了想法,所以我想解决这个问题.默认情况下,Spark中没有任何内容比任何其他数据处理工具更"内存":sc.textFile(foo).map(mapFunc).saveTextFile(bar)读取文件(逐块并分布在节点上)的简单示例,内存中的映射(如任何计算机程序) )然后再将其保存到存储器中.Spark对内存的使用在下面变得更加有趣(在Scala中,因为我对它更熟悉,但在Python中概念和方法名称完全相同):
val rdd = sc.textFile(foo)
// Do some preprocessing, such as parsing lines
val preprocessed = rdd.map(preprocessFunc)
// Tell Spark to cache preprocessed data (by default in memory)
preprocessed.cache()
// Perform some mapping and save output
preprocessed.map(mapFunc1).saveTextFile(outFile1)
// Perform a different mapping and save somewhere else
preprocessed.map(mapFunc2).saveTextFile(outFile2)
Run Code Online (Sandbox Code Playgroud)
这里的想法是使用cache()这样的预处理不必两次(可能); 默认情况下,Spark不会保存任何中间结果,但会计算每个单独操作的完整链,其中"操作"是saveTextFile调用.
我说"可能"因为实际缓存数据的能力受到节点内存的限制.Spark为缓存存储保留了一定的内存,与工作内存分开(参见http://spark.apache.org/docs/latest/configuration.html#memory-management如何管理这些内存部分的大小),并且只能缓存该数量可以容纳的数量.
根据您的分区,它可能会更少.假设您的3个节点中每个节点都有2GB的存储空间,数据preprocessed为6GB.如果此数据有3个分区,则它将完全适合,并且所有输入数据mapFunc2将从内存加载.但是如果你说有4个分区,每个分区1.5Gb,每个节点上只能缓存1个分区; 第四个分区将不适合仍然留在每台机器上的0.5GB,因此必须为第二个映射重新计算此分区,并且只有3/4的预处理数据将从内存中读取.
因此,从这个意义上来说,最好有许多小分区,以尽可能提高缓存效率,但这可能还有其他缺点:如果您碰巧使用具有细粒度模式的Mesos,以及大量小输出文件,则会产生更多开销,巨大延迟(如果在保存之前没有合并),Spark会将每个分区保存为单独的文件.
正如Durga所提到的那样,也有可能将数据不适合内存泄漏到磁盘,你可以按照他的链接:)
| 归档时间: |
|
| 查看次数: |
4861 次 |
| 最近记录: |