Jef*_*len 5 csv amazon-s3 apache-spark sparkr
请原谅我的简单问题,但我对Spark/Hadoop相对较新.
我正在尝试将一堆小的CSV文件加载到Apache Spark中.它们目前存储在S3中,但如果能简化,我可以在本地下载它们.我的目标是尽可能高效地完成这项工作.看起来让我的数十名Spark工作人员无所事事地让一些单线程主人下载并解析一堆CSV文件会是一种耻辱.我希望有一种惯用的方式来分发这项工作.
CSV文件排列在一个目录结构中,如下所示:
2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...
Run Code Online (Sandbox Code Playgroud)
我有两年的数据,每天都有目录,每个目录里面都有几百个CSV.所有这些CSV应该具有相同的模式,但是当然可能一个CSV出错并且如果存在一些有问题的文件,我讨厌整个作业崩溃.只要我在某个日志中通知我发生了这些文件,就可以跳过这些文件.
似乎我想到的每个Spark项目都采用相同的形式,我不知道如何解决它.(例如,尝试读取一组制表符分隔的天气数据,或者阅读一堆日志文件来查看这些数据.)
我已经尝试过SparkR和Scala库.我真的不在乎我需要使用哪种语言; 我对使用正确的习语/工具更感兴趣.
我原来的想法是枚举和parallelize所有year/mm-dd组合的列表,以便我可以让我的Spark工作人员每天独立处理(下载并解析所有CSV文件,然后将它们堆叠在彼此之上(unionAll())以减少它们).遗憾的是,使用spark-csv库下载和解析CSV文件只能在"父"/主作业中完成,而不能在每个子项中完成,因为Spark不允许作业嵌套.因此,只要我想使用Spark库进行导入/解析,这将无法工作.
当然,您可以使用语言的本机CSV解析来读取每个文件,然后将它们"上传"到Spark.在R中,这是一些包的组合,用于从S3获取文件,然后用a read.csv完成,createDataFrame()以获取数据到Spark.不幸的是,这真的很慢,而且似乎也是我希望Spark工作的方式.如果我的所有数据都是通过R管道进入Spark之前,为什么还要费心?
我开始研究这些量身定制的工具,很快就不堪重负.我的理解是,可以使用许多/所有这些工具将我的CSV文件从S3转换为HDFS.
当然,从HDFS读取我的CSV文件比S3更快,因此解决了部分问题.但我仍然需要解析成千上万的CSV,并且不知道在Spark中使用分布式方法.
所以现在(Spark 1.4)SparkR 支持json或parquet文件结构。Csv 文件可以被解析,但是 Spark 上下文需要用一个额外的 jar 来启动(需要下载它并放置在适当的文件夹中,我自己从未这样做过,但我的同事已经这样做了)。
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)
Run Code Online (Sandbox Code Playgroud)
文档中有更多信息。我预计较新的 Spark 版本将对此提供更多支持。
如果您不这样做,您将需要诉诸不同的文件结构或使用 python 将所有文件.csv从.parquet. 这是最近一次 Python 演讲中执行此操作的片段。
data = sc.textFile(s3_paths, 1200).cache()
def caster(x):
return Row(colname1 = x[0], colname2 = x[1])
df_rdd = data\
.map(lambda x: x.split(','))\
.map(caster)
ddf = sqlContext.inferSchema(df_rdd).cache()
ddf.write.save('s3n://<bucket>/<filename>.parquet')
Run Code Online (Sandbox Code Playgroud)
另外,您的数据集有多大?您甚至可能不需要 Spark 来进行分析。请注意,截至目前;
ggplot2.| 归档时间: |
|
| 查看次数: |
865 次 |
| 最近记录: |