Ana*_*and 7 apache-spark apache-spark-sql pyspark aws-glue
我有数据如下 -
{
"Id": "01d3050e",
"Properties": "{\"choices\":null,\"object\":\"demo\",\"database\":\"pg\",\"timestamp\":\"1581534117303\"}",
"LastUpdated": 1581530000000,
"LastUpdatedBy": "System"
}
Run Code Online (Sandbox Code Playgroud)
使用 awsglue,我想关联“属性”列,但由于数据类型是字符串,所以无法完成。将其转换为结构体,可能会根据阅读此博客来完成 -
>>> df.show
<bound method DataFrame.show of DataFrame[Id: string, LastUpdated: bigint, LastUpdatedBy: string, Properties: string]>
>>> df.show()
+--------+-------------+-------------+--------------------+
| Id| LastUpdated|LastUpdatedBy| Properties|
+--------+-------------+-------------+--------------------+
|01d3050e|1581530000000| System|{"choices":null,"...|
+--------+-------------+-------------+--------------------+
Run Code Online (Sandbox Code Playgroud)
如何使用关系化转换器或 pyspark 中的任何 UDF 取消嵌套“属性”列,将其分解为“选择”、“对象”、“数据库”和“时间戳”列。
bla*_*hop 11
使用该列是from_json因为该列Properties是 JSON 字符串。
如果所有记录的模式都相同,则可以通过定义模式将其转换为结构类型,如下所示:
schema = StructType([StructField("choices", StringType(), True),
StructField("object", StringType(), True),
StructField("database", StringType(), True),
StructField("timestamp", StringType(), True)],
)
df.withColumn("Properties", from_json(col("Properties"), schema)).show(truncate=False)
#+--------+-------------+-------------+---------------------------+
#|Id |LastUpdated |LastUpdatedBy|Properties |
#+--------+-------------+-------------+---------------------------+
#|01d3050e|1581530000000|System |[, demo, pg, 1581534117303]|
#+--------+-------------+-------------+---------------------------+
Run Code Online (Sandbox Code Playgroud)
但是,如果架构可以从一行更改为另一行,我建议您将其转换为 Map 类型:
df.withColumn("Properties", from_json(col("Properties"), MapType(StringType(), StringType()))).show(truncate=False)
#+--------+-------------+-------------+------------------------------------------------------------------------+
#|Id |LastUpdated |LastUpdatedBy|Properties |
#+--------+-------------+-------------+------------------------------------------------------------------------+
#|01d3050e|1581530000000|System |[choices ->, object -> demo, database -> pg, timestamp -> 1581534117303]|
#+--------+-------------+-------------+------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
element_at然后,您可以使用(Spark 2.4+)访问地图元素
from pyspark.sql import functions as F
list=[["01d3050e","{\"choices\":null,\"object\":\"demo\",\"database\":\"pg\",\"timestamp\":\"1581534117303\"}",1581530000000,"System"]]
df=spark.createDataFrame(list, ['Id','Properties','LastUpdated','LastUpdatedBy'])
df.show(truncate=False)
+--------+----------------------------------------------------------------------------+-------------+-------------+
|Id |Properties |LastUpdated |LastUpdatedBy|
+--------+----------------------------------------------------------------------------+-------------+-------------+
|01d3050e|{"choices":null,"object":"demo","database":"pg","timestamp":"1581534117303"}|1581530000000|System |
+--------+----------------------------------------------------------------------------+-------------+-------------+
Run Code Online (Sandbox Code Playgroud)
无需使用 UDF,内置函数已足够并且针对大数据任务进行了非常优化。
df.withColumn("Properties", F.split(F.regexp_replace(F.regexp_replace((F.regexp_replace("Properties",'\{|}',"")),'\:',','),'\"|"',"").cast("string"),','))\
.withColumn("choices", F.element_at("Properties",2))\
.withColumn("object", F.element_at("Properties",4))\
.withColumn("database",F.element_at("Properties",6))\
.withColumn("timestamp",F.element_at("Properties",8).cast('long')).drop("Properties").show()
+--------+-------------+-------------+-------+------+--------+-------------+
| Id| LastUpdated|LastUpdatedBy|choices|object|database| timestamp|
+--------+-------------+-------------+-------+------+--------+-------------+
|01d3050e|1581530000000| System| null| demo| pg|1581534117303|
+--------+-------------+-------------+-------+------+--------+-------------+
root
|-- Id: string (nullable = true)
|-- LastUpdated: long (nullable = true)
|-- LastUpdatedBy: string (nullable = true)
|-- choices: string (nullable = true)
|-- object: string (nullable = true)
|-- database: string (nullable = true)
|-- timestamp: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
21058 次 |
| 最近记录: |