Apache Spark 中按列分区到 S3

Aja*_*jay 3 hadoop scala mapreduce amazon-s3 apache-spark

有一个用例,我们想要从具有 JSON 的 S3 读取文件。然后,根据特定的 JSON 节点值,我们希望对数据进行分组并将其写入 S3。

我能够读取数据,但无法找到关于如何基于 JSON 密钥对数据进行分区然后上传到 S3 的好示例。任何人都可以提供任何示例或向我指出可以帮助我解决此用例的教程吗?

创建数据框后,我得到了数据的架构:

root
 |-- customer: struct (nullable = true)
 |    |-- customerId: string (nullable = true)
 |-- experiment: string (nullable = true)
 |-- expiryTime: long (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- programId: string (nullable = true)
 |-- score: double (nullable = true)
 |-- startTime: long (nullable = true)
 |-- targetSets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- featured: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- data: struct (nullable = true)
 |    |    |    |    |    |-- asinId: string (nullable = true)
 |    |    |    |    |-- pk: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- reason: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- recommended: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)

我想根据 customerId 列上的随机哈希对数据进行分区。但是当我这样做时:

df.write.partitionBy("customerId").save("s3/bucket/location/to/save");
Run Code Online (Sandbox Code Playgroud)

它给出错误:

org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));
Run Code Online (Sandbox Code Playgroud)

请让我知道我可以访问 customerId 列。

mrs*_*vas 5

让我们以数据集sample.json为例

{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"}
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"}
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"}
Run Code Online (Sandbox Code Playgroud)

现在开始用 Spark 破解它

val jsonDf = spark.read
  .format("json")
  .load("path/of/sample.json")

jsonDf.show()

+---------+-------+-----+-----+
|     CITY|CUST_ID|STATE|  ZIP|
+---------+-------+-----+-----+
| San Jose| 115734|   CA|95106|
|Allentown| 115728|   PA|18101|
|Allentown| 115730|   PA|18101|
|San Mateo| 114728|   CA|94401|
| Somerset| 114726|   NJ| 8873|
+---------+-------+-----+-----+
Run Code Online (Sandbox Code Playgroud)

然后按列对数据集进行分区"ZIP"并写入S3

jsonDf.write
  .partitionBy("ZIP")
  .save("s3/bucket/location/to/save")
  // one liner athentication to s3
  //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save")
Run Code Online (Sandbox Code Playgroud)

注意:为了使此代码成功访问 S3,必须正确配置密钥。检查Spark/Hadoop 与 S3 集成的答案

编辑:解决方案:在架构中找不到分区列 customerId(根据评论)

customerId存在于结构内部customer,因此尝试提取customerIdthen do 分区。

df.withColumn("customerId", $"customer.customerId")
  .drop("customer")
  .write.partitionBy("customerId")
  .save("s3/bucket/location/to/save")
Run Code Online (Sandbox Code Playgroud)