Sam*_*man 15 scala append apache-spark parquet
我正在编写一个ETL过程,我需要读取每小时的日志文件,对数据进行分区并保存.我正在使用Spark(在Databricks中).日志文件是CSV,因此我阅读它们并应用模式,然后执行我的转换.
我的问题是,如何将每小时的数据保存为镶木地板格式,但是附加到现有数据集?保存时,我需要按数据框中的4列进行分区.
这是我的保存行:
data
.filter(validPartnerIds($"partnerID"))
.write
.partitionBy("partnerID","year","month","day")
.parquet(saveDestination)
Run Code Online (Sandbox Code Playgroud)
问题是如果目标文件夹存在,则save会抛出错误.如果目的地不存在,那么我不会附加我的文件.
我尝试过使用.mode("append")但我发现Spark有时会在中途失败,所以我最终失去了我的数据写入量以及我还需要写多少.
我正在使用镶木地板,因为分区大大增加了我将来的查询.同样,我必须将数据写为磁盘上的某种文件格式,并且不能使用像Druid或Cassandra这样的数据库.
非常感谢有关如何划分我的数据帧和保存文件(坚持镶木地板或其他格式)的任何建议.
Gle*_*olt 12
如果需要附加文件,则必须使用追加模式.我不知道你希望它生成多少个分区,但我发现如果你有很多分区,partitionBy会导致很多问题(内存和IO问题都一样).
如果您认为您的问题是由写入操作耗时太长造成的,我建议您尝试以下两件事:
1)通过添加配置使用snappy:
conf.set("spark.sql.parquet.compression.codec", "snappy")
Run Code Online (Sandbox Code Playgroud)
2)禁止代的元数据文件的hadoopConfiguration上SparkContext是这样的:
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
Run Code Online (Sandbox Code Playgroud)
生成元数据文件会有点费时(参见这篇博客文章),但根据这一点,它们实际上并不重要.就个人而言,我总是禁用它们,没有任何问题.
如果你生成了很多分区(> 500),我担心我能做的最好的事情就是建议你研究一下不使用append-mode 的解决方案- 我根本就没有设法partitionBy使用那么多分区.
| 归档时间: |
|
| 查看次数: |
14086 次 |
| 最近记录: |