当前pyspark格式化logFile,然后加载redshift。
分析有关以json格式输出的logFile的每个项目,添加一个项目,然后将其加载到Redshift中。但是,某些项目的格式因每种类型而异。?对于同一项目,事先应用Shcema。即使按原样输出,也会输入转义字符?有没有办法动态创建架构信息,并且输出jsonfile没有转义符?
- 环境 -
- spark 2.4.0
- python version 2.7.15
Run Code Online (Sandbox Code Playgroud)
-DataFrame-
>> df.printSchema()
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
>> df.show(2,False)
+------+------------------------------------------------------------+
|Name |d |
+------+------------------------------------------------------------+
|Amber |[Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1]|
|Alfred|[Body -> {"Weight": 80, "Height": 176}, BodyType -> 2] |
+------+------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
-模式?对于普通项目?-
>> print(json.dumps(schema.jsonValue(), indent=2))
{
"fields": [
{
"metadata": {},
"type": "string",
"name": "Name",
"nullable": false
},
{
"metadata": {},
"type": {
"keyType": "string",
"type": "map",
"valueType": "string",
"valueContainsNull": true
},
"name": "d",
"nullable": false
}
],
"type": "struct"
}
Run Code Online (Sandbox Code Playgroud)
-代码-
from pyspark.sql.types import *
rdd = sc.parallelize([("Amber", {"Body": "{\"City\": \"Oregon\", \"Country\": \"US\"}", "BodyType": 1}), ("Alfred", {"Body": "{\"Weight\": 80, \"Height\": 176}", "BodyType": 2})])
schema = StructType([StructField('Name',StringType(), False)
,StructField('d',MapType(StringType(),StringType()), False)])
df = spark.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)
-输出json文件-
{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}}
Run Code Online (Sandbox Code Playgroud)
-输出json文件(理想)-
{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}, "Body":{"City": "Oregon", "Country": "US"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}, "Body":{"Weight": 80, "Height": 176}}
Run Code Online (Sandbox Code Playgroud)
我试图使用pyspark.sql.functions的schema_of_json()和from_json(),但是它不起作用。(schema_of_json只能接受字符文字)
-试验结果-
from pyspark.sql.functions import schema_of_json
from pyspark.sql.functions import from_json
df = df.withColumn('Body', df.select(from_json(df.d.body,schema_of_json(df.d.Body))))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/functions.py", line 2277, in from_json
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of schemaofjson(`d`['Body']);"
Run Code Online (Sandbox Code Playgroud)
简短的答案是否定的,没有办法动态推断每行的架构并最终得到一个列,其中不同的行具有不同的架构。
然而,有一种方法可以输出你想要的 json 字符串,并将不同的 json 协调成一个通用的、类型丰富的模式
如果允许的话,速度会非常慢,但更重要的是不允许,因为它破坏了 SparkSQL 一致运行的关系模型。
数据框由列(字段)组成,并且列只有一种数据类型;数据类型代表整个列。鉴于 Python 的性质,它在 Pyspark 中并不严格执行,但它在运行时很重要,因此该语句仍然适用。
在你的例子中,如果你想用类似的东西来投影属性,那么阿尔弗雷德和安布尔都必须存在。至少,即使没有值,该字段的元数据也必须存在。执行引擎需要快速知道路径是否无效,以避免对每一行进行无意义的扫描。Cityd.Body.City
在单列中协调多种类型的一些方法是(我确信还有更多我想不到的方法):
在这种情况下,我喜欢 (1),但 (4) 作为寻找通用模式的临时步骤可能是有效的。
您的示例“常见”json 架构更像是选项 (3)。在地图内,您称为“d”(我猜是因为它是一个字典?),如果不扫描数据,则无法获得有关字段的信息。
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)
我意识到这只是添加包含 的新列的临时步骤Body,但要做到这一点,您必须将该映射中的所有可能的键枚举到更有用的模式中。
通用(通用)模式不是 的通用映射string -> string,我认为它更有用,如下所示。它接近您最初尝试的内容,但不是动态的,并且对两行都有效。请注意,这是所有属性的nullable默认值True
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)
Body.City现在,您可以通过选择轻松访问,d.Body.City而不必担心哪些行有城市。
对于下一步,您可以将其恢复为 json 字符串
df = df.withColumn("Body", to_json("d.Body"))
Run Code Online (Sandbox Code Playgroud)
您也可以将其与上一步结合起来
df = df.withColumn("Body", to_json(from_json("d.Body", schema_body)))
Run Code Online (Sandbox Code Playgroud)
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyAttributes: struct (nullable = true)
| |-- Body: string (nullable = true)
| |-- BodyType: integer (nullable = true)
|-- Body: string (nullable = true)
df.show(2, False)
+------+---------------------------------------+--------------------------------+
|Name |BodyAttributes |Body |
+------+---------------------------------------+--------------------------------+
|Amber |[{"City": "Oregon", "Country": "US"},1]|{"City":"Oregon","Country":"US"}|
|Alfred|[{"Weight": 80, "Height": 176},2] |{"Weight":80,"Height":176} |
+------+---------------------------------------+--------------------------------+
Run Code Online (Sandbox Code Playgroud)
请注意,当将其转换回 json 字符串时,那些 NULL 值消失了。现在它也是一个 jsonstring,很容易根据需要写入文件。
如果您这样做是为了分析、报告或其他目的而访问数据的过程的一部分,我会这样做
schema = StructType([
StructField('Name',StringType(), False),
StructField(
'd',
StructType([
StructField("Body", StringType()),
StructField("BodyType", IntegerType())
])
)
])
df = spark.createDataFrame(rdd, schema)
df = df.withColumn(
"Body",
from_json("d.Body", schema_body)
).withColumn(
"BodyType",
col("d.BodyType")
).drop("d")
df.printSchema()
root
|-- Name: string (nullable = false)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)
|-- BodyType: integer (nullable = true)
df.show(2, False)
+------+---------------------+--------+
|Name |Body |BodyType|
+------+---------------------+--------+
|Amber |[Oregon,US,null,null]|1 |
|Alfred|[null,null,80,176] |2 |
+------+---------------------+--------+
Run Code Online (Sandbox Code Playgroud)
然后您可以选择Body.City, Body.Country,Body.Weight, Body.Height`
您可以再前进一步,但这实际上取决于这些可能的 Body 键有多少以及它的稀疏程度。
df = df.withColumn(
"City", col("Body.City")
).withColumn(
"Country", col("Body.Country")
).withColumn(
"Weight", col("Body.Weight")
).withColumn(
"Height", col("Body.Height")
).drop("Body")
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyType: integer (nullable = true)
|-- City: string (nullable = true)
|-- Country: string (nullable = true)
|-- Weight: integer (nullable = true)
|-- Height: integer (nullable = true)
df.show(2, False)
+------+--------+------+-------+------+------+
|Name |BodyType|City |Country|Weight|Height|
+------+--------+------+-------+------+------+
|Amber |1 |Oregon|US |null |null |
|Alfred|2 |null |null |80 |176 |
+------+--------+------+-------+------+------+
Run Code Online (Sandbox Code Playgroud)