JDe*_*suv 37 scala dataframe apache-spark apache-spark-sql spark-cassandra-connector
我有一个Cassandra表,为简单起见,看起来像:
key: text
jsonData: text
blobData: blob
Run Code Online (Sandbox Code Playgroud)
我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
Run Code Online (Sandbox Code Playgroud)
我正在努力将JSON数据扩展到其底层结构中.我最终希望能够根据json字符串中的属性进行过滤并返回blob数据.像jsonData.foo ="bar"之类的东西并返回blobData.这目前可能吗?
zer*_*323 72
Spark> = 2.4
如果需要,可以使用schema_of_json函数确定模式:
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]()))
Run Code Online (Sandbox Code Playgroud)
Spark> = 2.1
你可以使用from_json功能:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Run Code Online (Sandbox Code Playgroud)
Spark> = 1.6
您可以使用get_json_object哪个采用列和路径:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
Run Code Online (Sandbox Code Playgroud)
并将字段提取到单个字符串,这些字符串可以进一步转换为预期类型.
该path参数使用点语法$.表示,前导表示文档根(因为上面的代码使用字符串插值$必须进行转义,因此$$.).
Spark <= 1.5:
这目前可能吗?
据我所知,这不是直接可能的.你可以尝试类似的东西:
val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")
Run Code Online (Sandbox Code Playgroud)
我假设该blob字段不能用JSON表示.否则你的出租车省略拆分和加入:
import org.apache.spark.sql.Row
val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})
val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema
// root
// |-- jsonData: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: double (nullable = true)
// |-- key: long (nullable = true)
// |-- blobData: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
另一种(更便宜,但更复杂)的方法是使用UDF来解析JSON并输出一个struct或map列.例如这样的事情:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show
// +---+--------------------+------------------+----------+
// |key| jsonData| blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]|
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]|
// +---+--------------------+------------------+----------+
parsed.printSchema
// root
// |-- key: string (nullable = true)
// |-- jsonData: string (nullable = true)
// |-- blobData: string (nullable = true)
// |-- parsedJSON: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
Nic*_*mas 24
zero323 的回答是彻底的,但错过了 Spark 2.1+ 中可用的一种方法,它比使用更简单、更健壮schema_of_json():
import org.apache.spark.sql.functions.from_json
val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))
Run Code Online (Sandbox Code Playgroud)
这是 Python 的等价物:
from pyspark.sql.functions import from_json
json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))
Run Code Online (Sandbox Code Playgroud)
该问题schema_of_json(),因为zero323指出,是它检查一个字符串,并从派生的模式。如果您拥有具有不同架构的 JSON 数据,那么您返回的架构schema_of_json()将不会反映您将 DataFrame 中所有 JSON 数据的架构合并后会得到什么。解析该数据from_json()将产生大量null或空值,其中返回的模式schema_of_json()与数据不匹配。
通过使用 Spark 从 JSON 字符串的 RDD 派生出综合 JSON 模式的能力,我们可以保证所有 JSON 数据都可以被解析。
schema_of_json()vs。spark.read.json()这是一个示例(在 Python 中,Scala 的代码非常相似)来说明schema_of_json()使用spark.read.json().
>>> df = spark.createDataFrame(
... [
... (1, '{"a": true}'),
... (2, '{"a": "hello"}'),
... (3, '{"b": 22}'),
... ],
... schema=['id', 'jsonData'],
... )
Run Code Online (Sandbox Code Playgroud)
a在一行中有一个布尔值,在另一行中有一个字符串值。合并的模式a将其类型设置为字符串。b将是一个整数。
让我们看看不同的方法如何比较。一、schema_of_json()做法:
>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
|-- id: long (nullable = true)
|-- jsonData: struct (nullable = true)
| |-- a: boolean (nullable = true)
>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
| 1| [true]|
| 2| null|
| 3| []|
+---+--------+
Run Code Online (Sandbox Code Playgroud)
如您所见,我们导出的 JSON 模式非常有限。"a": "hello"无法解析为布尔值并返回null,并且"b": 22只是因为它不在我们的架构中而被删除。
现在spark.read.json():
>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
|-- id: long (nullable = true)
|-- jsonData: struct (nullable = true)
| |-- a: string (nullable = true)
| |-- b: long (nullable = true)
>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
| 1| [true,]|
| 2|[hello,]|
| 3| [, 22]|
+---+--------+
Run Code Online (Sandbox Code Playgroud)
在这里,我们保留了所有数据,并使用了一个涵盖所有数据的综合架构。"a": true被转换为字符串以匹配"a": "hello".
使用的主要缺点spark.read.json()是 Spark 将扫描您的所有数据以导出模式。根据您拥有的数据量,这种开销可能很大。如果您知道您的所有 JSON 数据都具有一致的架构,那么继续并仅schema_of_json()针对单个元素使用就可以了。如果您有架构可变性但不想扫描所有数据,则可以设置samplingRatio为小于1.0调用中的值spark.read.json()以查看数据的子集。
以下是以下文档spark.read.json():Scala API / Python API
| 归档时间: |
|
| 查看次数: |
41366 次 |
| 最近记录: |