如何在纱线客户端处理运行时间过长的任务(与工作中的其他人相比)?

tnk*_*eka 13 hadoop-yarn apache-spark parquet

我们使用Spark集群yarn-client来计算几个业务,但有时我们的任务运行时间太长:

在此输入图像描述

我们没有设置超时,但我认为默认超时火花任务不会太长,这里(1.7h).

有人给我一个解决这个问题的理想吗?

zen*_*ngr 20

如果花费太长时间,就没有办法让火花杀死它的任务.

但我找到了一种方法来处理这个使用猜测,

这意味着如果一个或多个任务在一个阶段中运行缓慢,它们将被重新启动.

spark.speculation                  true
spark.speculation.multiplier       2
spark.speculation.quantile         0
Run Code Online (Sandbox Code Playgroud)

注意:spark.speculation.quantile意味着"推测"将从您的第一个任务开始.所以要谨慎使用它.我正在使用它,因为随着时间的推移,一些工作因GC而变慢.所以我认为你应该知道何时使用它 - 它不是一颗银弹.

一些相关链接:http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-always-wait-for-stragglers-to-finish-running-td14298.htmlhttp:// mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%3CCAPmMX=rOVQf7JtDu0uwnp1xNYNyz4xPgXYayKex42AZ_9Pvjug@mail.gmail.com%3E

更新

我找到了解决我的问题的方法(可能不适用于所有人).我有一堆模拟每个任务运行,所以我在运行中添加了超时.如果模拟花费的时间更长(由于特定运行的数据偏差),它将超时.

ExecutorService executor = Executors.newCachedThreadPool();
Callable<SimResult> task = () -> simulator.run();

Future<SimResult> future = executor.submit(task);
try {
    result = future.get(1, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
    future.cancel(true);
    SPARKLOG.info("Task timed out");
}
Run Code Online (Sandbox Code Playgroud)

确保你在simulator主循环中处理一个中断,如:

if(Thread.currentThread().isInterrupted()){
    throw new InterruptedException();
} 
Run Code Online (Sandbox Code Playgroud)