Pau*_*iou 2 java hadoop scala apache-spark
问题:我正在尝试重新分区数据集,以便在指定的整数列中具有相同编号的所有行都在同一个分区中.
工作原理:当我使用带有RDD的1.6 API(用Java)时,我使用了一个散列分区器,这可以按预期工作.例如,如果我为每一行打印此列的每个值的模数,则在给定分区中获得相同的模数(我通过手动读取saveAsHadoopFile保存的内容来读取分区).
使用最新的API无法正常工作
但是现在我正在尝试使用2.0.1 API(在Scala中)和具有重新分区方法的数据集,该方法需要多个分区和列,并将此DataSet保存为镶木地板文件.如果我在分区中查看给定此列的行未分区,结果就不一样了.
要保存分区,Dataset您可以使用以下任一方法:
DataFrameWriter.partitionBy - 自Spark 1.6起可用
df.write.partitionBy("someColumn").format(...).save()
Run Code Online (Sandbox Code Playgroud)DataFrameWriter.bucketBy - 自Spark 2.0起可用
df.write.bucketBy("someColumn").format(...).save()
Run Code Online (Sandbox Code Playgroud)使用也df.partitionBy("someColumn").write.format(...).save应该工作,但DatasetAPI不使用哈希码.它使用的MurmurHash结果将与HashParitionerRDD API中的结果不同,并且琐碎的检查(如您所描述的那样)将不起作用.
val oldHashCode = udf((x: Long) => x.hashCode)
// https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/util/Utils.scala#L1596-L1599
val nonNegativeMode = udf((x: Int, mod: Int) => {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
})
val df = spark.range(0, 10)
val oldPart = nonNegativeMode(oldHashCode($"id"), lit(3))
val newPart = nonNegativeMode(hash($"id"), lit(3))
df.select($"*", oldPart, newPart).show
Run Code Online (Sandbox Code Playgroud)
+---+---------------+--------------------+
| id|UDF(UDF(id), 3)|UDF(hash(id, 42), 3)|
+---+---------------+--------------------+
| 0| 0| 1|
| 1| 1| 2|
| 2| 2| 2|
| 3| 0| 0|
| 4| 1| 2|
| 5| 2| 2|
| 6| 0| 0|
| 7| 1| 0|
| 8| 2| 2|
| 9| 0| 2|
+---+---------------+--------------------+
Run Code Online (Sandbox Code Playgroud)
一个可能的问题是,DataFrame编写者可以合并多个小文件以降低成本,因此来自不同分区的数据可以放在一个文件中.
| 归档时间: |
|
| 查看次数: |
2139 次 |
| 最近记录: |