用于大数据数据处理的分布式计算

exp*_*ent 7 scala distributed-computing apache-spark

我有一个庞大的时间序列数据,我想使用spark的并行处理/分布式计算进行数据处理.要求是逐行查看数据,以确定下面指定的组在所需的结果部分下,如果没有执行者之间的某种协调,我真的无法获得分配这一点的火花

t- timeseries datetime sample,
lat-latitude,
long-longitude
Run Code Online (Sandbox Code Playgroud)


例如:采用一小部分样本数据集来解释案例

t   lat long
0   27  28
5   27  28
10  27  28
15  29  49
20  29  49
25  27  28
30  27  28 
Run Code Online (Sandbox Code Playgroud)

期望的输出应该是:

Lat-long    interval
(27,28) (0,10)
(29,49) (15,20)
(27,28) (25,30)
Run Code Online (Sandbox Code Playgroud)

我可以使用这段代码获得所需的结果

val spark = SparkSession.builder().master("local").getOrCreate()

import spark.implicits._

 val df = Seq(
  (0, 27,28),
  (5, 27,28),
  (10, 27,28),
  (15, 26,49),
  (20, 26,49),
  (25, 27,28),
  (30, 27,28)
).toDF("t", "lat","long")

val dfGrouped = df
.withColumn("lat-long", struct($"lat", $"long"))

val wAll = Window.partitionBy().orderBy($"t".asc)

dfGrouped.withColumn("lag", lag("lat-long", 1, null).over(wAll))
.orderBy(asc("t")).withColumn("detector", when($"lat-long" === $"lag", 0)
    .otherwise(1)).withColumn("runningTotal", sum("detector").over(wAll))
.groupBy("runningTotal", "lat-long").agg(struct(min("t"), max("t")).as("interval"))
.drop("runningTotal").show
}
Run Code Online (Sandbox Code Playgroud)

但是,如果数据进入两个执行器,那么数据就像

执行人1中的数据:

t   lat long
0   27  28
5   27  28
10  27  28
15  29  49
20  29  49
25  27  28
Run Code Online (Sandbox Code Playgroud)

执行人2中的数据:

t   lat long
30   27  28
Run Code Online (Sandbox Code Playgroud)


我应该如何获得大量数据的所需输出.必须有更聪明的方法来实现这一点,通过执行器之间的某种协调来分配它以获得该结果.

请指导我一个正确的方向,我已经研究了相同但无法找到解决方案.

PS:这只是一个示例.

小智 -1

您可以使用 UDAF 解决此问题。首先,您可以添加一列,该列代表在您拥有的多个执行器中分区的 t 列。类似于 executorIndex = t % ((max(t) - min(t)) / numExecutors)。

然后您可以通过 executorIndex 应用 UDAF 分组。

您的 UDAF 需要存储一个带有 String 键(例如)的 Map,该键代表一对纬度和经度,以及一个 int[] 代表该经纬度键的 maxT 和 minT。

请询问您是否需要更广泛的解释。

希望这有帮助...

PS:我认为相同的纬度和经度之间存在一些时间关系,如果您正在跟踪某些运动,这是正常的......

  • 感谢您的努力,但请提供更广泛的解释。 (2认同)