JH.*_*Lee 1 json python-2.7 apache-spark apache-spark-sql pyspark
我正在尝试将多行合并为一列,作为 spark 数据帧(spark 1.6.1)中的有效 json 格式。然后我希望它存储在 mysql 表中。
我的原始火花数据框如下所示:
|user_id |product_id|price |
|A |p1 |3000 |
|A |p2 |1500 |
|B |P1 |3000 |
|B |P3 |2000 |
Run Code Online (Sandbox Code Playgroud)
我想像这样转换上表:
|user_id |contents_json
|A |{(product_id:p1, price:3000), (product_id:p2, price:1500)}
|B |{{product_id:p1, price:3000), (product_id:p3, price:2000)}
Run Code Online (Sandbox Code Playgroud)
然后把上面的表放到mysql表中。
这是完全相反的爆炸方式,但我找不到正确的方法。
我假设您正在寻找下面显示的 JSON 输出。
from pyspark.sql.functions import col, collect_list, struct
df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])
Run Code Online (Sandbox Code Playgroud)
> Spark2.0
df1 = df.\
groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()
Run Code Online (Sandbox Code Playgroud)
火花1.6
zipCols = psf.udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("product_id", StringType()),
StructField("price", IntegerType())
]))
)
df1 = df.\
groupBy("user_id").agg(
zipCols(
collect_list(col("product_id")),
collect_list(col("price"))
).alias("contents_json")
)
Run Code Online (Sandbox Code Playgroud)
for row in df1.toJSON().collect():
print row
Run Code Online (Sandbox Code Playgroud)
输出是:
{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
3085 次 |
| 最近记录: |