我想并行运行 SQL 查询,并且能够将并行级别控制为 8 个查询。现在,我正在做这段代码。这个想法是创建 8 个分区并允许执行器并行运行它们。
(1 to 8).toSeq.toDF.repartition(8) // 8 partitions
.rdd.mapPartitions(
x => {
val conn = createConnection()
x.foreach{
s => { // expect the below query be run concurently
execute(s"SELECT * FROM myTable WHERE col = ${s.get(0)}")
}
}
conn.close()
x
}).take(1)
Run Code Online (Sandbox Code Playgroud)
问题是 8 个查询是一一运行的。
我应该如何继续让查询运行 8 by 8 ?
当你这样做时
val df = (1 to 8).toSeq.toDF.repartition(8)
Run Code Online (Sandbox Code Playgroud)
这不会创建 8 个分区,每个分区有 1 条记录。如果您检查此数据框(例如参见/sf/answers/3222282031/),那么您会得到:
+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 0|
| 4| 0|
| 5| 0|
| 6| 4|
| 7| 4|
+----------------+-----------------+
Run Code Online (Sandbox Code Playgroud)
因此,您将只有 2 个非空分区,因此您将拥有最大 2 倍并行度(我在这里询问过这个问题:How does Round Robin Partitioning in Spark work?)
要制作相同大小的分区,您最好使用
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
Run Code Online (Sandbox Code Playgroud)
代替
(1 to 8).toSeq.toDF.repartition(8).rdd
Run Code Online (Sandbox Code Playgroud)
第一个选项为每个分区提供 1 条记录,第二个选项则不然,因为它使用循环分区
附带说明一下,当您这样做时x.foreach,thenx将被消耗(迭代器只能遍历一次),因此如果您返回,x您将始终得到一个空迭代器。
所以你的最终代码可能如下所示:
+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 0|
| 4| 0|
| 5| 0|
| 6| 4|
| 7| 4|
+----------------+-----------------+
Run Code Online (Sandbox Code Playgroud)
除了使用mapPartitions(这是惰性的)之外,您还可以使用foreachPartition,这是非惰性的
由于每个分区只有 1 条记录,因此迭代分区并不是真正有益,您也可以只使用普通的foreach:
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2371 次 |
| 最近记录: |