如何将行合并到 spark 数据帧的列中作为有效的 json 将其写入 mysql

JH.*_*Lee 1 json python-2.7 apache-spark apache-spark-sql pyspark

我正在尝试将多行合并为一列,作为 spark 数据帧(spark 1.6.1)中的有效 json 格式。然后我希望它存储在 mysql 表中。

我的原始火花数据框如下所示:

|user_id   |product_id|price       | 
|A         |p1        |3000        |
|A         |p2        |1500        |
|B         |P1        |3000        |
|B         |P3        |2000        |
Run Code Online (Sandbox Code Playgroud)

我想像这样转换上表:

|user_id   |contents_json 
|A         |{(product_id:p1, price:3000), (product_id:p2, price:1500)} 
|B         |{{product_id:p1, price:3000), (product_id:p3, price:2000)} 
Run Code Online (Sandbox Code Playgroud)

然后把上面的表放到mysql表中。

这是完全相反的爆炸方式,但我找不到正确的方法。

Pre*_*rem 6

我假设您正在寻找下面显示的 JSON 输出。

from pyspark.sql.functions import col, collect_list, struct

df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
                     ('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])
Run Code Online (Sandbox Code Playgroud)

> Spark2.0

df1 = df.\
    groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()
Run Code Online (Sandbox Code Playgroud)

火花1.6

zipCols = psf.udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("product_id", StringType()),
      StructField("price", IntegerType())
  ]))
)

df1 = df.\
    groupBy("user_id").agg(
        zipCols(
            collect_list(col("product_id")), 
            collect_list(col("price"))
        ).alias("contents_json")
    )
Run Code Online (Sandbox Code Playgroud)


for row in df1.toJSON().collect():
    print row
Run Code Online (Sandbox Code Playgroud)

输出是:

{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}
Run Code Online (Sandbox Code Playgroud)


希望这可以帮助!