多个火花作业通过分区将镶木地板数据附加到相同的基本路径

vce*_*ick 19 apache-spark parquet

我有多个要并行执行的作业,它们使用分区将每日数据附加到同一路径中.

例如

dataFrame.write().
         partitionBy("eventDate", "category")
            .mode(Append)
            .parquet("s3://bucket/save/path");
Run Code Online (Sandbox Code Playgroud)

工作1 - category ="billing_events"工作2 - category ="click_events"

这两个作业都将在执行之前截断s3存储桶中存在的任何现有分区,然后将生成的镶木地板文件保存到各自的分区.

作业1 - > s3:// bucket/save/path/eventDate = 20160101/channel = billing_events

job 2 - > s3:// bucket/save/path/eventDate = 20160101/channel = click_events

我面临的问题是在作业执行期间由spark创建的临时文件.它将处理文件保存到基本路径

S3://桶/保存/路/ _temporary/...

因此两个作业最终共享相同的临时文件夹并导致冲突,我注意到这可能导致一个作业删除临时文件,而另一个作业失败,来自s3的404表示预期的临时文件不存在.

有没有人遇到过这个问题,并提出了在同一个基本路径中并行执行作业的策略?

我现在使用spark 1.6.0

vce*_*ick 16

所以在经过多次阅读有关如何解决这个问题之后,我认为id会将一些智慧转移到这里来包装.多谢Tal的评论.

我还发现直接写入s3:// bucket/save/path似乎很危险,因为如果一个作业被杀死并且临时文件夹的清理不会在作业结束时发生,那么它似乎就在那里下一个工作,我注意到有时以前被杀死的工作temp的文件落在s3://桶/保存/路径中并导致重复...完全不可靠......

此外,将_temporary文件夹文件重命名为相应的s3文件需要花费大量时间(每个文件约1秒),因为S3仅支持复制/删除不重命名.此外,只有驱动程序实例使用单个线程重命名这些文件,因此只有1/5的具有大量文件/分区的作业只等待重命名操作.

出于多种原因,我已经排除了使用DirectOutputCommitter的可能性.

  1. 与推测模式结合使用时会导致重复(https://issues.apache.org/jira/browse/SPARK-9899)
  2. 任务失败会留下混乱,以后无法找到和删除/清理.
  3. Spark 2.0已完全取消对此的支持,并且不存在升级路径.(https://issues.apache.org/jira/browse/SPARK-10063)

执行这些作业的唯一安全,高效且一致的方法是首先将它们保存到hdfs中的唯一临时文件夹(由applicationId或timestamp唯一).在完成工作时复制到S3.

这允许并发作业执行,因为它们将保存到唯一的临时文件夹,不需要使用DirectOutputCommitter,因为HDFS上的重命名操作比S3快,并且保存的数据更加一致.

  • 感谢您提供这些宝贵的见解。这是比看起来应做的更为复杂的工作。 (2认同)
  • @vcetinick 我也面临这个问题。感谢您的解决方案。我们在 Spark 2.4.4 和 Hadoop 2.8.5 中有解决方案吗? (2认同)