Nak*_*euh 5 scala apache-spark
我正在使用 Spark 2.0.0 的 SQL API。
我想知道当我必须对我的数据执行两个独立的操作时,什么是好的做法。这是一个基本示例:
val ds = sc.parallelize(List(
("2018-12-07T15:31:48Z", "AAA",3),
("2018-12-07T15:32:48Z", "AAA",25),
("2018-12-07T15:33:48Z", "AAA",20),
("2018-12-07T15:34:48Z", "AAA",10),
("2018-12-07T15:35:48Z", "AAA",15),
("2018-12-07T15:36:48Z", "AAA",16),
("2018-12-07T15:37:48Z", "AAA",8),
("2018-12-07T15:31:48Z", "BBB",15),
("2018-12-07T15:32:48Z", "BBB",0),
("2018-12-07T15:33:48Z", "BBB",0),
("2018-12-07T15:34:48Z", "BBB",1),
("2018-12-07T15:35:48Z", "BBB",8),
("2018-12-07T15:36:48Z", "BBB",7),
("2018-12-07T15:37:48Z", "BBB",6)
)).toDF("timestamp","tag","value")
val newDs = commonTransformation(ds).cache();
newDs.count() // force computation of the dataset
val dsAAA = newDs.filter($"tag"==="AAA")
val dsBBB = newDs.filter($"tag"==="BBB")
actionAAA(dsAAA)
actionBBB(dsBBB)
Run Code Online (Sandbox Code Playgroud)
使用以下功能:
def commonTransformation(ds:Dataset[Row]):Dataset[Row]={
ds // do multiple transformations on dataframe
}
def actionAAA(ds:Dataset[Row]){
Thread.sleep(5000) // Sleep to simulate an action that takes time
ds.show()
}
def actionBBB(ds:Dataset[Row]){
Thread.sleep(5000) // Sleep to simulate an action that takes time
ds.show()
}
Run Code Online (Sandbox Code Playgroud)
在此示例中,我们有一个输入数据集,其中包含由 'tag' 列标识的多个时间序列。一些转换适用于整个数据集。
然后,我想根据时间序列的标签对我的数据应用不同的操作。
在我的示例中,我得到了预期的结果,但我不得不等待很长时间才能执行我的两个操作,尽管我有可用的执行程序。
我通过使用 Java 类 Future 部分解决了这个问题,它允许我以异步方式开始我的操作。但是有了这个解决方案,如果我开始的动作比他的资源多太多,Spark 就会变得非常慢,最终花费的时间比一个一个地做动作要多。
所以现在,我的解决方案是启动多个动作,同时运行的动作的最大限制,但我觉得这不是一个好方法(而且最大限制很难猜测)。
| 归档时间: |
|
| 查看次数: |
2473 次 |
| 最近记录: |