以分布式方式在Spark中读取CSV文件

And*_*ohn 6 csv distributed apache-spark

我正在开发一个Spark处理框架,它读取大型CSV文件,将它们加载到RDD中,执行一些转换,最后保存一些统计信息.

有问题的CSV文件平均约为50GB.我正在使用Spark 2.0.

我的问题是:

当我使用sparkContext.textFile()函数加载文件时,是否需要先将文件存储在驱动程序的内存中,然后将其分发给worker(因此驱动程序需要相当大的内存)?或者每个工作人员"并行"读取文件,这样他们都不需要存储整个文件,驱动程序只能作为"管理员"?

提前致谢

Ass*_*son 9

定义读数时,文件将根据您的并行计划划分为分区,并将指令发送给工作人员.然后,文件系统中的工作人员直接读取文件(因此需要可用于所有节点(如HDFS)的分布式文件系统).

作为旁注,使用spark.read.csv而不是RDD将数据读取到数据框会好得多.这将占用更少的内存,并允许火花来优化您的查询.

UPDATE

在评论中,有人询问如果文件系统没有分发并且文件只位于一台机器上会发生什么.答案是,如果你有超过1台机器,它很可能会失败.

当你执行sparkContext.textFile时,实际上什么都没有读,它只是告诉你想要读取什么.然后你对它进行一些转换,但仍然没有读取任何内容,因为你正在定义一个计划.执行操作(例如收集)后,即开始实际处理.Spark会将作业划分为任务并将其发送给执行程序.然后,执行程序(可能位于主节点或工作节点上)将尝试读取文件的某些部分.问题是任何不在主节点上的执行程序都会查找该文件而无法找到它导致任务失败.Spark会重试几次(我相信默认值为4)然后完全失败.

当然,如果你只有一个节点,那么所有执行程序都会看到该文件,一切都会好的.同样在理论上,可能是任务在工作人员上失败然后重新运行在主人身上并在那里取得成功但在任何情况下工人都不会做任何工作,除非他们看到文件的副本.

您可以通过将文件复制到所有节点中完全相同的路径或使用任何类型的分布式文件系统来解决此问题(甚至NFS共享也很好).

当然,您始终可以在单个节点上工作,但是您不会利用spark的可扩展性.