spark scala:将数据帧或数据集转换为单个逗号分隔的字符串

NS *_*nan 6 java scala apache-spark spark-dataframe

下面是将打印一列 DataSet[Row] 的 spark scala 代码:

import org.apache.spark.sql.{Dataset, Row, SparkSession}
val spark: SparkSession = SparkSession.builder()
        .appName("Spark DataValidation")
        .config("SPARK_MAJOR_VERSION", "2").enableHiveSupport()
        .getOrCreate()

val kafkaPath:String="hdfs:///landing/APPLICATION/*"
val targetPath:String="hdfs://datacompare/3"
val pk:String = "APPLICATION_ID" 
val pkValues = spark
        .read
        .json(kafkaPath)
        .select("message.data.*")
        .select(pk)
        .distinct() 
pkValues.show()
Run Code Online (Sandbox Code Playgroud)

关于代码的输出:

+--------------+
|APPLICATION_ID|
+--------------+
|           388|
|           447|
|           346|
|           861|
|           361|
|           557|
|           482|
|           518|
|           432|
|           422|
|           533|
|           733|
|           472|
|           457|
|           387|
|           394|
|           786|
|           458|
+--------------+
Run Code Online (Sandbox Code Playgroud)

题 :

如何将此数据框转换为逗号分隔的字符串变量?

预期输出:

val   data:String= "388,447,346,861,361,557,482,518,432,422,533,733,472,457,387,394,786,458"
Run Code Online (Sandbox Code Playgroud)

请建议如何将 DataFrame[Row] 或 Dataset 转换为一个 String 。

SCo*_*uto 6

我认为这不是一个好主意,因为 dataFrame 是一个分布式对象并且可能很大。Collect将把所有数据都带到驱动程序中,因此您应该谨慎执行此类操作。

以下是您可以使用 dataFrame 执行的操作(两个选项):

df.select("APPLICATION_ID").rdd.map(r => r(0)).collect.mkString(",")
df.select("APPLICATION_ID").collect.mkString(",")
Run Code Online (Sandbox Code Playgroud)

结果与只有 3 行的测试数据帧:

String = 388,447,346
Run Code Online (Sandbox Code Playgroud)

编辑:使用 DataSet 您可以直接执行以下操作:

ds.collect.mkString(",")
Run Code Online (Sandbox Code Playgroud)

  • 即使使用 DataFrame 也不需要去 RDD API(因为他似乎使用 spark 2),你可以省略 `.rdd` 甚至做 `df.select("APPLICATION_ID").as[String].collect。 mkString(",")` (2认同)