Spark:并行转换多个数据帧

Fal*_*con 5 apache-spark

了解如何在并行转换多个数据帧时实现最佳并行性

我有一系列路径

val paths = Array("path1", "path2", .....
Run Code Online (Sandbox Code Playgroud)

我从每个路径加载数据帧,然后转换并写入目标路径

paths.foreach(path => {
  val df = spark.read.parquet(path)
  df.transform(processData).write.parquet(path+"_processed")
})
Run Code Online (Sandbox Code Playgroud)

转换processData独立于我正在加载的数据帧。

这限制了一次处理一个数据帧,并且我的大部分集群资源都处于空闲状态。由于处理每个数据帧是独立的,因此我转换ArrayParArrayscala。

paths.par.foreach(path => {
  val df = spark.read.parquet(path)
  df.transform(processData).write.parquet(path+"_processed")
})
Run Code Online (Sandbox Code Playgroud)

现在它在集群中使用更多的资源。我仍在尝试了解它是如何工作的以及如何在这里微调并行处理

  1. 如果我将默认的 scala 并行度增加到ForkJoinPool更高的数量,是否会导致在驱动程序端生成更多线程,并将处于锁定状态等待foreach函数完成并最终杀死驱动程序?

  2. 它如何影响集中式 Spark 事物,例如EventLoggingListnener在并行处理多个数据帧时需要处理更多事件流入。

  3. 为了实现最佳资源利用,我需要考虑哪些参数。

  4. 任何其他方法

我可以通过了解这种扩展的任何资源都会非常有帮助

Oli*_*Oli 4

速度慢的原因是 Spark 非常擅长对存储在一个大数据帧中的大量数据进行并行计算。然而,它在处理大量数据帧方面非常糟糕。它将使用其所有执行器开始计算(即使并非全部需要),并等待它完成后再开始下一个执行器。这会导致大量不活动的处理器。这很糟糕,但这不是 Spark 的设计目的。

我有一个技巧给你。可能需要稍微完善一下,但你会有这个想法。这就是我要做的。从路径列表中,我将提取镶木地板文件的所有架构,并创建一个收集所有列的新大架构。然后,我会要求 Spark 使用此模式读取所有镶木地板文件(不存在的列将自动设置为 null)。然后,我将合并所有数据帧并对这个大数据帧执行转换,最后用于partitionBy将数据帧存储在单独的文件中,同时仍然并行执行所有操作。它看起来像这样。

// let create two sample datasets with one column in common (id)
// and two different columns x != y
val d1 = spark.range(3).withColumn("x", 'id * 10)
d1.show
+---+----+
| id|  x |
+---+----+
|  0|   0|
|  1|  10|
|  2|  20|
+---+----+

val d2 = spark.range(2).withColumn("y", 'id cast "string")
d2.show
+---+---+
| id|  y|
+---+---+
|  0|  0|
|  1|  1|
+---+---+

// And I store them
d1.write.parquet("hdfs:///tmp/d1.parquet")
d2.write.parquet("hdfs:///tmp/d2.parquet")
Run Code Online (Sandbox Code Playgroud)
// Now let's create the big schema
val paths = Seq("hdfs:///tmp/d1.parquet", "hdfs:///tmp/d2.parquet")
val fields = paths
    .flatMap(path => spark.read.parquet(path).schema.fields)
    .toSet //removing duplicates
    .toArray
val big_schema = StructType(fields)

// and let's use it
val dfs = paths.map{ path => 
    spark.read
        .schema(big_schema)
        .parquet(path)
        .withColumn("path", lit(path.split("/").last))
}

// Then we are ready to create one big dataframe
dfs.reduce( _ unionAll _).show
+---+----+----+----------+
| id|   x|   y|      file|
+---+----+----+----------+
|  1|   1|null|d1.parquet|
|  2|   2|null|d1.parquet|
|  0|   0|null|d1.parquet|
|  0|null|   0|d2.parquet|
|  1|null|   1|d2.parquet|
+---+----+----+----------+
Run Code Online (Sandbox Code Playgroud)

然而,我不建议unionAll在大量数据帧上使用。由于 Spark 对执行计划的分析,对于许多数据帧,它可能会非常慢。我会使用 RDD 版本,尽管它更冗长。

val rdds = sc.union(dfs.map(_.rdd))
// let's not forget to add the path to the schema
val big_df = spark.createDataFrame(rdds, 
    big_schema.add(StructField("path", StringType, true)))
transform(big_df)
    .write
    .partitionBy("path")
    .parquet("hdfs:///tmp/processed.parquet")
Run Code Online (Sandbox Code Playgroud)

看看我处理过的目录,我得到这个:

hdfs:///tmp/processed.parquet/_SUCCESS
hdfs:///tmp/processed.parquet/path=d1.parquet
hdfs:///tmp/processed.parquet/path=d2.parquet
Run Code Online (Sandbox Code Playgroud)