Jon*_*Jon 7 json pyspark databricks azure-databricks
我在从 databricks 中的 Cosmos DB 读取项目时遇到一些问题,它似乎将 JSON 作为字符串值读取,并且在将数据从其中获取到列时遇到一些问题。
我有一个名为 ProductRanges 的列,其中连续包含以下值:
[ {
"name": "Red",
"min": 0,
"max": 99,
"value": "Order More"
},
{
"name": "Amber",
"min": 100,
"max": 499,
"value": "Stock OK"
},
{
"name": "Green",
"min": 500,
"max": 1000000,
"value": "Overstocked"
}
]
Run Code Online (Sandbox Code Playgroud)
在 Cosmos DB 中,JSON 文档是有效的,在导入数据时,数据帧中的数据类型是字符串,而不是我期望的 JSON 对象/结构。
我希望能够计算“名称”出现的次数,并迭代它们以获取最小值、最大值和值项,因为我们可以拥有的范围数可以超过 3。我已经尽管在 stackoverflow 和其他地方发表了一些帖子,但仍停留在格式上。我尝试使用爆炸并读取基于列值的模式,但它确实说“在无效文档中”,认为这可能是由于 Pyspark 在开始和结束时需要 {},但甚至将其连接到来自 cosmos db 的 SQL 查询最终仍然是字符串的数据类型。
任何指示将不胜感激
我看到您从 Azure CosmosDB 检索 JSON 文档并将其转换为 PySpark DataFrame,但嵌套的 JSON 文档或数组无法按预期转换为 DataFrame 列中的 JSON 对象,因为模块中没有定义 JSON 类型pyspark.sql.types,如下。
PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame当我试图解决它时,我搜索了一个适合您当前情况的文档,甚至与您想要的相同。
上面的文档展示了如何使用ArrayType、StructType和StructField其他基本 PySpark 数据类型将列中的 JSON 字符串转换为组合数据类型,可以通过定义列架构和 UDF 在 PySpark 中更轻松地处理组合数据类型。
这是示例代码的摘要。希望能帮助到你。
source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
Run Code Online (Sandbox Code Playgroud)
JSON 通过 sqlContext 读入数据帧。输出是:
+------+--------------------+
|attr_1| attr_2|
+------+--------------------+
| 1|[{"a":1,"b":1},{"...|
| 2|[{"a":3,"b":3},{"...|
+------+--------------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
attr_2然后,通过定义列模式和 UDF来转换列。
# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))
# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()
Run Code Online (Sandbox Code Playgroud)
输出如下:
+------+--------------+
|attr_1| attr_2|
+------+--------------+
| 1|[[1,1], [2,2]]|
| 2|[[3,3], [4,4]]|
+------+--------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = false)
| | |-- b: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
21108 次 |
| 最近记录: |