Ala*_*tos 2 java apache-spark parquet apache-spark-sql
我的内存中有RDD。我想使用一些任意函数对RDD进行分组,然后将每个单独的组写为单独的Parquet文件。
例如,如果我的RDD由以下形式的JSON字符串组成:
{"type":"finish","resolution":"success","csr_id": 214}
{"type":"create","resolution":"failure","csr_id": 321}
{"type":"action","resolution":"success","csr_id": 262}
Run Code Online (Sandbox Code Playgroud)
我想按“ type”属性对JSON字符串进行分组,并将具有相同“ type”的每组字符串写入同一Parquet文件。
我可以看到DataFrame API可以按以下方式写出Parquet文件(例如,如果RDD由JSON字符串组成):
final JavaRDD<String> rdd = ...
final SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
final DataFrame dataFrame = sqlContext.read().json(rdd);
dataFrame.write().parquet(location);
Run Code Online (Sandbox Code Playgroud)
但是,这意味着整个DataFrame都将写入Parquet文件,因此Parquet文件将包含具有“ type”属性的不同值的记录。
Dataframe API还提供了groupBy函数:
final GroupedData groupedData = dataFrame.groupBy(this::myFunction);
Run Code Online (Sandbox Code Playgroud)
但是GroupedData API似乎没有提供任何功能来将每个组写到单个文件中。
有任何想法吗?
您不能写入,GroupedData但是可以在写入时对数据进行分区:
dataFrame.write.partitionBy("type").format("parquet").save("/tmp/foo")
Run Code Online (Sandbox Code Playgroud)
每种类型都将以${column}=${value}格式写入其自己的目录。这些可以单独加载:
sqlContext.read.parquet("/tmp/foo/type=action").show
// +------+----------+
// |csr_id|resolution|
// +------+----------+
// | 262| success|
// +------+----------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2582 次 |
| 最近记录: |