我有多个要并行执行的作业,它们使用分区将每日数据附加到同一路径中.
例如
dataFrame.write().
partitionBy("eventDate", "category")
.mode(Append)
.parquet("s3://bucket/save/path");
Run Code Online (Sandbox Code Playgroud)
工作1 - category ="billing_events"工作2 - category ="click_events"
这两个作业都将在执行之前截断s3存储桶中存在的任何现有分区,然后将生成的镶木地板文件保存到各自的分区.
即
作业1 - > s3:// bucket/save/path/eventDate = 20160101/channel = billing_events
job 2 - > s3:// bucket/save/path/eventDate = 20160101/channel = click_events
我面临的问题是在作业执行期间由spark创建的临时文件.它将处理文件保存到基本路径
S3://桶/保存/路/ _temporary/...
因此两个作业最终共享相同的临时文件夹并导致冲突,我注意到这可能导致一个作业删除临时文件,而另一个作业失败,来自s3的404表示预期的临时文件不存在.
有没有人遇到过这个问题,并提出了在同一个基本路径中并行执行作业的策略?
我现在使用spark 1.6.0
当我尝试将数据集写入镶木地板文件时,出现以下错误
18/11/05 06:25:43 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 84 in stage 1.0 failed 4 times, most recent failure: Lost task 84.3 in stage 1.0 (TID 989, ip-10-253-194-207.nonprd.aws.csp.net, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
但是当我给出时dataset.show()我可以查看数据。不确定在哪里检查根本原因。