如何知道Apache Spark中当前正在运行的作业的哪个阶段?

KOU*_*DAL 12 java scala bigdata apache-spark

考虑一下我在Spark有一份工作;

CSV文件 ==> 按列过滤 ==> 取样 ==> 另存为JSON

现在我的要求是如何知道作业当前正在以编程方式执行哪个步骤(获取文件过滤采样)(最好使用Java API)?这有什么办法吗?

我可以使用SparkListener类跟踪Job,Stage和Task .它可以像跟踪阶段ID一样完成.但是如何知道哪个阶段Id适用于工作链中的哪一步.

考虑按列过滤完成后,我想向用户发送通知.为此,我创建了一个扩展SparkListener类的类.但我无法从中找到当前正在执行的转换名称的名称.有可能跟踪吗?

public class ProgressListener extends SparkListener{

  @Override
  public void onJobStart(SparkListenerJobStart jobStart)
  {

  }

  @Override
  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted)
  {
      //System.out.println("Stage Name : "+stageSubmitted.stageInfo().getStatusString()); giving action name only
  }

  @Override
  public void onTaskStart(SparkListenerTaskStart taskStart)
  {
      //no such method like taskStart.name()
  }
}
Run Code Online (Sandbox Code Playgroud)

hag*_*age 5

您无法确切地知道例如过滤操作何时开始或结束。

那是因为你有转换(filtermap,...)和行动(countforeach,...)。Spark将把尽可能多的操作置于一个阶段。然后,在输入的不同分区上并行执行该阶段。问题来了。

假设您有几个工人和以下程序

LOAD ==> MAP ==> FILTER ==> GROUP BY +聚合

该程序可能分为两个阶段:第一阶段将加载文件并应用mapfilter。然后,将对输出进行混洗以创建组。在第二阶段,将执行聚合。

现在的问题是,您有几个工作人员,每个工作人员将并行处理部分输入数据。也就是说,集群中的每个执行者都将收到程序的副本(当前阶段),并在分配的分区上执行该副本。

您会看到,您将拥有mapfilter运算符的多个实例,这些实例可以并行执行,但不一定要同时执行。在极端情况下,工作人员1将在工作人员20完全开始之前完成阶段1(因此,其filter工作将在工作人员20之前完成)。

对于RDD,Spark 在阶段内使用迭代器模型。但是,对于最新Spark版本的数据集,它们会在分区上创建一个循环并执行转换。这意味着在这种情况下,Spark本身并不真正知道转换运算符何时完成单个任务!

长话短说:

  1. 您不知道阶段内的操作何时完成
  2. 即使可以,也有多个实例将在不同的时间结束。

所以,现在我已经遇到了同样的问题:

在我们的Piglet项目中(请允许进行一些对抗;-)),我们从Pig Latin脚本生成了Spark代码,并希望对脚本进行概要分析。我最后mapPartition在所有用户操作符之间插入操作符,这些操作符将分区ID和当前时间发送到服务器,服务器将评估消息。但是,此解决方案也有其局限性...我还不完全满意。

但是,除非能够修改程序,否则恐怕无法实现所需的功能。