在读取/加载时将原始JSON保留为Spark DataFrame中的列吗?

rev*_*end 5 json apache-spark apache-spark-sql

我一直在寻找一种在将数据读取到Spark DataFrame中时将原始(JSON)数据添加为列的方法。我有一种使用联接执行此操作的方法,但希望有一种方法可以使用Spark 2.2.x +在单个操作中执行此操作。

因此,例如数据:

{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}
{"team":"Sharks","origin": "San Jose", "eliminated":"true"}
{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}
Run Code Online (Sandbox Code Playgroud)

执行时:

val logs = sc.textFile("/Users/vgk/data/tiny.json") // example data file
spark.read.json(logs).show
Run Code Online (Sandbox Code Playgroud)

可以预期的是:

{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}
{"team":"Sharks","origin": "San Jose", "eliminated":"true"}
{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}
Run Code Online (Sandbox Code Playgroud)

我希望在初始加载时具有以上内容,但是将原始JSON数据作为附加列。例如(截断的原始值):

val logs = sc.textFile("/Users/vgk/data/tiny.json") // example data file
spark.read.json(logs).show
Run Code Online (Sandbox Code Playgroud)

非理想的解决方案涉及联接:

val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs).withColumn("uniqueID",monotonically_increasing_id)
val rawdf = df.toJSON.withColumn("uniqueID",monotonically_increasing_id)
df.join(rawdf, "uniqueID")
Run Code Online (Sandbox Code Playgroud)

结果得到与上述相同的数据帧,但带有和添加的uniqueID列。此外,json是从DF呈现的,不一定是“原始”数据。实际上,它们是相等的,但是对于我的用例,实际的原始数据是可取的。

有谁知道一种解决方案,它将在加载时捕获原始JSON数据作为附加列?

Sha*_*ala 5

如果您有接收到的数据的架构,那么您可以使用from_jsonwithschema来获取所有字段并保持该raw字段原样

val logs = spark.sparkContext.textFile(path) // example data file

val schema = StructType(
  StructField("team", StringType, true)::
  StructField("colors", StringType, true)::
  StructField("eliminated", StringType, true)::
  StructField("origin", StringType, true)::Nil
)

logs.toDF("values")
    .withColumn("json", from_json($"values", schema))
    .select("values", "json.*")

    .show(false)
Run Code Online (Sandbox Code Playgroud)

输出:

+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|values                                                                  |team          |colors        |eliminated|origin   |
+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}|Golden Knights|gold,red,black|null      |Las Vegas|
|{"team":"Sharks","origin": "San Jose", "eliminated":"true"}             |Sharks        |null          |true      |San Jose |
|{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}          |Wild          |red,green,gold|null      |Minnesota|
+------------------------------------------------------------------------+--------------+--------------+----------+---------+
Run Code Online (Sandbox Code Playgroud)

希望他的帮助!

  • 不幸的是,源数据是日志流并且未定义模式。此外,可以随时在流中的日志行中添加或删除新属性。 (2认同)