Spark:并行的多个独立动作

Nak*_*euh 5 scala apache-spark

我正在使用 Spark 2.0.0 的 SQL API。

我想知道当我必须对我的数据执行两个独立的操作时,什么是好的做法。这是一个基本示例:

val ds = sc.parallelize(List(
    ("2018-12-07T15:31:48Z", "AAA",3),
    ("2018-12-07T15:32:48Z", "AAA",25),
    ("2018-12-07T15:33:48Z", "AAA",20),
    ("2018-12-07T15:34:48Z", "AAA",10),
    ("2018-12-07T15:35:48Z", "AAA",15),
    ("2018-12-07T15:36:48Z", "AAA",16),
    ("2018-12-07T15:37:48Z", "AAA",8),
    ("2018-12-07T15:31:48Z", "BBB",15),
    ("2018-12-07T15:32:48Z", "BBB",0),
    ("2018-12-07T15:33:48Z", "BBB",0),
    ("2018-12-07T15:34:48Z", "BBB",1),
    ("2018-12-07T15:35:48Z", "BBB",8),
    ("2018-12-07T15:36:48Z", "BBB",7),
    ("2018-12-07T15:37:48Z", "BBB",6)
    )).toDF("timestamp","tag","value")

val newDs = commonTransformation(ds).cache();
newDs.count() // force computation of the dataset

val dsAAA = newDs.filter($"tag"==="AAA")
val dsBBB = newDs.filter($"tag"==="BBB")

actionAAA(dsAAA)
actionBBB(dsBBB)
Run Code Online (Sandbox Code Playgroud)

使用以下功能:

def commonTransformation(ds:Dataset[Row]):Dataset[Row]={
    ds // do multiple transformations on dataframe
}

def actionAAA(ds:Dataset[Row]){
    Thread.sleep(5000) // Sleep to simulate an action that takes time
    ds.show()  
}

def actionBBB(ds:Dataset[Row]){
    Thread.sleep(5000) // Sleep to simulate an action that takes time
    ds.show()
}
Run Code Online (Sandbox Code Playgroud)

在此示例中,我们有一个输入数据集,其中包含由 'tag' 列标识的多个时间序列。一些转换适用于整个数据集。

然后,我想根据时间序列的标签对我的数据应用不同的操作。

在我的示例中,我得到了预期的结果,但我不得不等待很长时间才能执行我的两个操作,尽管我有可用的执行程序。

我通过使用 Java 类 Future 部分解决了这个问题,它允许我以异步方式开始我的操作。但是有了这个解决方案,如果我开始的动作比他的资源多太多,Spark 就会变得非常慢,最终花费的时间比一个一个地做动作要多。

所以现在,我的解决方案是启动多个动作,同时运行的动作的最大限制,但我觉得这不是一个好方法(而且最大限制很难猜测)。