相关疑难解决方法(0)

Spark在groupBy/aggregate中合并/组合数组

以下Spark代码正确演示了我想要做的事情,并使用一个小的演示数据集生成正确的输出.

当我在大量生产数据上运行相同的一般类型的代码时,我遇到了运行时问题.Spark作业在我的集群上运行约12个小时并失败.

只是看一下下面的代码,爆炸每一行似乎都是低效的,只是将它合并回来.在给定的测试数据集中,第四行包含array_value_1中的三个值和array_value_2中的三个值,它们将爆炸为3*3或9个爆炸行.

那么,在一个更大的数据集中,一行有五个这样的数组列,每列有十个值,会爆炸成10 ^ 5个爆炸行?

查看提供的Spark函数,没有开箱即用的功能可以实现我想要的功能.我可以提供用户定义的功能.这有什么速度缺点吗?

val sparkSession = SparkSession.builder.
  master("local")
  .appName("merge list test")
  .getOrCreate()

val schema = StructType(
  StructField("category", IntegerType) ::
    StructField("array_value_1", ArrayType(StringType)) ::
    StructField("array_value_2", ArrayType(StringType)) ::
    Nil)

val rows = List(
  Row(1, List("a", "b"), List("u", "v")),
  Row(1, List("b", "c"), List("v", "w")),
  Row(2, List("c", "d"), List("w")),
  Row(2, List("c", "d", "e"), List("x", "y", "z"))
)

val df = sparkSession.createDataFrame(rows.asJava, schema)

val dfExploded = df.
  withColumn("scalar_1", explode(col("array_value_1"))).
  withColumn("scalar_2", explode(col("array_value_2")))

// This will output 19. 2*2 + 2*2 + 2*1 + …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

9
推荐指数
1
解决办法
1万
查看次数

标签 统计

apache-spark ×1

apache-spark-sql ×1

scala ×1