在spark scala 中将行合并为单个struct 列存在效率问题,我们如何做得更好?

ale*_*ree 5 scala apache-spark

我正在尝试加快和限制获取多列及其值并将它们插入到同一行的地图中的成本。这是一项要求,因为我们有一个正在读取此作业的遗留系统,但尚未准备好进行重构。还有另外一张地图,有一些数据需要与此结合。

目前,我们有一些解决方案,所有这些解决方案似乎都会在同一集群上产生大约相同的运行时间,其中大约 1TB 的数据存储在 Parquet 中:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._

def jsonToMap(s: String, map: Map[String, String]): Map[String, String] = { 
  implicit val formats = org.json4s.DefaultFormats
    val jsonMap = if(!s.isEmpty){
      parse(s).extract[Map[String, String]]
    } else {
      Map[String, String]()
    }
    if(map != null){
      map ++ jsonMap
    } else {
      jsonMap
    }
  }
val udfJsonToMap = udf(jsonToMap _)

def addMap(key:String, value:String, map: Map[String,String]): Map[String,String] = {
  if(map == null) {
    Map(key -> value)
  } else {
    map + (key -> value)
  }
}

val addMapUdf = udf(addMap _)

val output = raw.columns.foldLeft(raw.withColumn("allMap", typedLit(Map.empty[String, String]))) { (memoDF, colName) =>
    if(colName.startsWith("columnPrefix/")){
        memoDF.withColumn("allMap", when(col(colName).isNotNull, addMapUdf(substring_index(lit(colName), "/", -1), col(colName), col("allTagsMap")) ))
    } else if(colName.equals("originalMap")){
        memoDF.withColumn("allMap", when(col(colName).isNotNull, udfJsonToMap(col(colName), col("allMap"))))
    } else {
      memoDF
    }
}
Run Code Online (Sandbox Code Playgroud)

在 9 m5.xlarge 上大约需要 1 小时

val resourceTagColumnNames = raw.columns.filter(colName => colName.startsWith("columnPrefix/"))
def structToMap: Row => Map[String,String] = { row =>
  row.getValuesMap[String](resourceTagColumnNames)
}
val structToMapUdf = udf(structToMap)

val experiment = raw
  .withColumn("allStruct", struct(resourceTagColumnNames.head, resourceTagColumnNames.tail:_*))
  .select("allStruct")
  .withColumn("allMap", structToMapUdf(col("allStruct")))
  .select("allMap")
Run Code Online (Sandbox Code Playgroud)

也在同一个集群上运行大约 1 小时

这段代码一切正常,但速度不够快,它比我们现在拥有的所有其他变换长约 10 倍,这对我们来说是一个瓶颈。

有没有另一种更有效的方法来获得这个结果?

编辑:我也尝试通过键限制数据,但是因为尽管键保持不变,我正在合并的列中的值可能会发生变化,我无法在不冒数据丢失风险的情况下限制数据大小。

Vin*_*oba 1

Tl;DR:仅使用 Spark sql 内置函数可以显着加快计算速度

如此答案中所解释的,spark sql 本机函数比用户定义的函数性能更高。因此,我们可以尝试仅使用 Spark sql 本机函数来实现您的问题的解决方案。

我展示了两个主要的实现版本。一个使用我写这个答案时可用的 Spark 最新版本(即 Spark 3.0)中存在的所有 sql 函数。当提出问题时,另一个仅使用 Spark 版本中存在的 sql 函数,因此 Spark 2.3 中存在函数。该版本使用的所有功能在Spark 2.2中也可用

Spark 3.0用sql函数实现

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}

val mapFromPrefixedColumns = map_filter(
  map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c.dropWhile(_ != '/').tail), col(c))): _*),
  (_, v) => v.isNotNull
)

val mapFromOriginalMap = when(col("originalMap").isNotNull && col("originalMap").notEqual(""),
  from_json(col("originalMap"), MapType(StringType, StringType))
).otherwise(
  map()
)

val comprehensiveMapExpr = map_concat(mapFromPrefixedColumns, mapFromOriginalMap)

raw.withColumn("allMap", comprehensiveMapExpr)
Run Code Online (Sandbox Code Playgroud)

Spark 2.2 用sql函数实现

在spark 2.2中,我们没有map_concat(spark 2.4中可用)和map_filter(spark 3.0中可用)的功能。我用用户定义的函数替换它们:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}

def filterNull(map: Map[String, String]): Map[String, String] = map.toSeq.filter(_._2 != null).toMap
val filter_null_udf = udf(filterNull _)

def mapConcat(map1: Map[String, String], map2: Map[String, String]): Map[String, String] = map1 ++ map2
val map_concat_udf = udf(mapConcat _)

val mapFromPrefixedColumns = filter_null_udf(
  map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c.dropWhile(_ != '/').tail), col(c))): _*)
)

val mapFromOriginalMap = when(col("originalMap").isNotNull && col("originalMap").notEqual(""),
  from_json(col("originalMap"), MapType(StringType, StringType))
).otherwise(
  map()
)

val comprehensiveMapExpr = map_concat_udf(mapFromPrefixedColumns, mapFromOriginalMap)

raw.withColumn("allMap", comprehensiveMapExpr)
Run Code Online (Sandbox Code Playgroud)

使用sql函数实现,无需json映射

问题的最后部分包含一个简化的代码,没有映射 json 列,也没有过滤结果映射中的空值。我针对这个特定案例创建了以下实现。由于我不使用 Spark 2.2 和 Spark 3.0 之间添加的函数,因此我不需要此实现的两个版本:

import org.apache.spark.sql.functions._

val mapFromPrefixedColumns = map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c), col(c))): _*)
raw.withColumn("allMap", mapFromPrefixedColumns)
Run Code Online (Sandbox Code Playgroud)

跑步

对于以下数据框作为输入:

+--------------------+--------------------+--------------------+----------------+
|columnPrefix/column1|columnPrefix/column2|columnPrefix/column3|originalMap     |
+--------------------+--------------------+--------------------+----------------+
|a                   |1                   |x                   |{"column4": "k"}|
|b                   |null                |null                |null            |
|c                   |null                |null                |{}              |
|null                |null                |null                |null            |
|d                   |2                   |null                |                |
+--------------------+--------------------+--------------------+----------------+
Run Code Online (Sandbox Code Playgroud)

我得到以下列allMap

+--------------------------------------------------------+
|allMap                                                  |
+--------------------------------------------------------+
|[column1 -> a, column2 -> 1, column3 -> x, column4 -> k]|
|[column1 -> b]                                          |
|[column1 -> c]                                          |
|[]                                                      |
|[column1 -> d, column2 -> 2]                            |
+--------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

对于没有 json 列的映射:

+---------------------------------------------------------------------------------+
|allMap                                                                           |
+---------------------------------------------------------------------------------+
|[columnPrefix/column1 -> a, columnPrefix/column2 -> 1, columnPrefix/column3 -> x]|
|[columnPrefix/column1 -> b, columnPrefix/column2 ->, columnPrefix/column3 ->]    |
|[columnPrefix/column1 -> c, columnPrefix/column2 ->, columnPrefix/column3 ->]    |
|[columnPrefix/column1 ->, columnPrefix/column2 ->, columnPrefix/column3 ->]      |
|[columnPrefix/column1 -> d, columnPrefix/column2 -> 2, columnPrefix/column3 ->]  |
+---------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

基准

我生成了一个 1000 万行的 csv 文件,未压缩(大约 800 Mo),包含一列没有列前缀,九列有列前缀,以及一个包含 json 作为字符串的冒号:

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|id |columnPrefix/column1|columnPrefix/column2|columnPrefix/column3|columnPrefix/column4|columnPrefix/column5|columnPrefix/column6|columnPrefix/column7|columnPrefix/column8|columnPrefix/column9|originalMap        |
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|1  |iwajedhor           |ijoefzi             |der                 |ob                  |galsu               |ril                 |le                  |zaahuz              |fuzi                |{"column10":"true"}|
|2  |ofo                 |davfiwir            |lebfim              |roapej              |lus                 |roum                |te                  |javes               |karutare            |{"column10":"true"}|
|3  |jais                |epciel              |uv                  |piubnak             |saajo               |doke                |ber                 |pi                  |igzici              |{"column10":"true"}|
|4  |agami               |zuhepuk             |er                  |pizfe               |lafudbo             |zan                 |hoho                |terbauv             |ma                  |{"column10":"true"}|
...
Run Code Online (Sandbox Code Playgroud)

基准测试是读取此 csv 文件,创建列allMap,并将此列写入 parquet。我在本地机器上运行了这个,得到了以下结果

+--------------------------+--------------------+-------------------------+-------------------------+
|     implementations      | current (with udf) | sql functions spark 3.0 | sql functions spark 2.2 |
+--------------------------+--------------------+-------------------------+-------------------------+
| execution time           | 138 seconds        | 48 seconds              | 82 seconds              |
| improvement from current | 0 % faster         | 64 % faster             | 40 % faster             |
+--------------------------+--------------------+-------------------------+-------------------------+
Run Code Online (Sandbox Code Playgroud)

我还针对问题中的第二个实现进行了测试,该实现删除了 json 列的映射和 map 中 null 值的过滤。

+--------------------------+-----------------------+------------------------------------+
| implementations          | current (with struct) | sql functions without json mapping |
+--------------------------+-----------------------+------------------------------------+
| execution time           | 46 seconds            | 35 seconds                         |
| improvement from current | 0 %                   | 23 % faster                        |
+--------------------------+-----------------------+------------------------------------+
Run Code Online (Sandbox Code Playgroud)

当然,基准测试相当基础,但与使用用户定义函数的实现相比,我们可以看到改进

结论

当您遇到性能问题并且使用用户定义函数时,尝试用 Spark sql 函数替换这些用户定义函数可能是个好主意