在Spark中的RDD上执行分组,并将每个分组写为单独的Parquet文件

Ala*_*tos 2 java apache-spark parquet apache-spark-sql

我的内存中有RDD。我想使用一些任意函数对RDD进行分组,然后将每个单独的组写为单独的Parquet文件。

例如,如果我的RDD由以下形式的JSON字符串组成:

{"type":"finish","resolution":"success","csr_id": 214}
{"type":"create","resolution":"failure","csr_id": 321}
{"type":"action","resolution":"success","csr_id": 262}
Run Code Online (Sandbox Code Playgroud)

我想按“ type”属性对JSON字符串进行分组,并将具有相同“ type”的每组字符串写入同一Parquet文件。

我可以看到DataFrame API可以按以下方式写出Parquet文件(例如,如果RDD由JSON字符串组成):

final JavaRDD<String> rdd = ...
final SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
final DataFrame dataFrame = sqlContext.read().json(rdd);
dataFrame.write().parquet(location);
Run Code Online (Sandbox Code Playgroud)

但是,这意味着整个DataFrame都将写入Parquet文件,因此Parquet文件将包含具有“ type”属性的不同值的记录。

Dataframe API还提供了groupBy函数:

final GroupedData groupedData = dataFrame.groupBy(this::myFunction);
Run Code Online (Sandbox Code Playgroud)

但是GroupedData API似乎没有提供任何功能来将每个组写到单个文件中。

有任何想法吗?

zer*_*323 7

您不能写入,GroupedData但是可以在写入时对数据进行分区:

dataFrame.write.partitionBy("type").format("parquet").save("/tmp/foo")
Run Code Online (Sandbox Code Playgroud)

每种类型都将以${column}=${value}格式写入其自己的目录。这些可以单独加载:

sqlContext.read.parquet("/tmp/foo/type=action").show
// +------+----------+
// |csr_id|resolution|
// +------+----------+
// |   262|   success|
// +------+----------+
Run Code Online (Sandbox Code Playgroud)