KOU*_*DAL 12 java scala bigdata apache-spark
考虑一下我在Spark有一份工作;
CSV文件 ==> 按列过滤 ==> 取样 ==> 另存为JSON
现在我的要求是如何知道作业当前正在以编程方式执行哪个步骤(获取文件或过滤或采样)(最好使用Java API)?这有什么办法吗?
我可以使用SparkListener类跟踪Job,Stage和Task .它可以像跟踪阶段ID一样完成.但是如何知道哪个阶段Id适用于工作链中的哪一步.
考虑按列过滤完成后,我想向用户发送通知.为此,我创建了一个扩展SparkListener类的类.但我无法从中找到当前正在执行的转换名称的名称.有可能跟踪吗?
public class ProgressListener extends SparkListener{
@Override
public void onJobStart(SparkListenerJobStart jobStart)
{
}
@Override
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted)
{
//System.out.println("Stage Name : "+stageSubmitted.stageInfo().getStatusString()); giving action name only
}
@Override
public void onTaskStart(SparkListenerTaskStart taskStart)
{
//no such method like taskStart.name()
}
}
Run Code Online (Sandbox Code Playgroud)
您无法确切地知道例如过滤操作何时开始或结束。
那是因为你有转换(filter,map,...)和行动(count,foreach,...)。Spark将把尽可能多的操作置于一个阶段。然后,在输入的不同分区上并行执行该阶段。问题来了。
假设您有几个工人和以下程序
LOAD ==> MAP ==> FILTER ==> GROUP BY +聚合
该程序可能分为两个阶段:第一阶段将加载文件并应用map和filter。然后,将对输出进行混洗以创建组。在第二阶段,将执行聚合。
现在的问题是,您有几个工作人员,每个工作人员将并行处理部分输入数据。也就是说,集群中的每个执行者都将收到程序的副本(当前阶段),并在分配的分区上执行该副本。
您会看到,您将拥有map和filter运算符的多个实例,这些实例可以并行执行,但不一定要同时执行。在极端情况下,工作人员1将在工作人员20完全开始之前完成阶段1(因此,其filter工作将在工作人员20之前完成)。
对于RDD,Spark 在阶段内使用迭代器模型。但是,对于最新Spark版本的数据集,它们会在分区上创建一个循环并执行转换。这意味着在这种情况下,Spark本身并不真正知道转换运算符何时完成单个任务!
长话短说:
所以,现在我已经遇到了同样的问题:
在我们的Piglet项目中(请允许进行一些对抗;-)),我们从Pig Latin脚本生成了Spark代码,并希望对脚本进行概要分析。我最后mapPartition在所有用户操作符之间插入操作符,这些操作符将分区ID和当前时间发送到服务器,服务器将评估消息。但是,此解决方案也有其局限性...我还不完全满意。
但是,除非能够修改程序,否则恐怕无法实现所需的功能。
| 归档时间: |
|
| 查看次数: |
2054 次 |
| 最近记录: |