Ank*_*kur 3 join cross-join apache-spark apache-spark-dataset
我已经在 Spark 用户论坛上发布了这个问题,但没有收到回复,所以在这里再次询问。
我们有一个用例,需要进行笛卡尔连接,但由于某种原因,我们无法让它与数据集 API 一起工作。
我们有两个数据集:
- 一个包含 2 个字符串列的数据集,例如 c1、c2。这是一个小型数据集,约有 100 万条记录。这两列都是 32 个字符的字符串,因此应小于 500 mb。
我们广播这个数据集
- 另一个数据集稍大一些,有约 1000 万条记录
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count
Run Code Online (Sandbox Code Playgroud)
如果我使用 RDD api 实现它,在 ds1 中广播数据,然后在 ds2 中过滤数据,那么它工作得很好。
我已确认广播成功。
2019-02-14 23:11:55 INFO CodeGenerator:54 - 代码在 10.469136 ms 内生成 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - 开始读取广播变量 29 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - 读取广播变量 29 花费了 6 毫秒 2019-02-14 23:11:56 INFO CodeGenerator:54 - 在 11.280087 毫秒内生成的代码
查询计划:
== 物理计划 ==
BroadcastNestedLoopJoin BuildRight, Cross, ((c1#68 <= c11#13) && (c11#13 <= c2#69))
:- *Project []
: +- *Filter isnotnull(_c0#0 )
: +- *FileScan csv [_c0#0,_c1#1,_c2#2,_c3#3,_c4#4,_c5#5] 批处理: false, 格式: CSV, 位置: InMemoryFileIndex[], PartitionFilters: [] , PushedFilters: [IsNotNull(_c0)], ReadSchema: struct<_c0:string,_c1:string,_c2:string,_c3:string,_c4:string,_c5:string> +
- BroadcastExchange IdentityBroadcastMode
+- *项目 [c1#68 , c2#69]
+- *Filter (isnotnull(c1#68) && isnotnull(c2#69))
+- *FileScan csv [c1#68,c2#69] 批处理: false,格式:CSV,位置:InMemoryFileIndex[ ],PartitionFilters:[],PushedFilters:[IsNotNull(c1),IsNotNull(c2)],ReadSchema:结构
那么阶段就不会进步。
我更新了代码以使用广播 ds1,然后在 ds2 的 mapPartitions 中进行连接。
val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)
Run Code Online (Sandbox Code Playgroud)
然后在mapPartitions方法中使用这个rangeBC来识别ds2中每一行所属的范围,并且这个作业在3小时内完成,而另一个作业甚至在24小时后也没有完成。这意味着查询优化器没有执行我希望它执行的操作。
我究竟做错了什么?任何指示都会有所帮助。谢谢你!
小智 5
我最近遇到了这个问题,发现 Spark 在交叉连接大型数据帧时有奇怪的分区行为。如果您的输入数据帧包含几百万条记录,则交叉连接的数据帧的分区等于输入数据帧分区的乘积,即
crossJoinDF 的分区 = (ds1 的分区) * (ds2 的分区)。
如果 ds1 或 ds2 包含大约数百个分区,则交叉连接数据帧的分区范围约为 10,000。这些分区太多,导致管理许多小任务的开销过大,使得交叉连接数据帧上的任何计算(在您的情况下是过滤器)运行速度非常慢。
那么如何使计算速度更快呢?首先检查这是否确实是您的问题:
scala> val crossJoinDF = ds2.crossJoin(ds1)
# This should return immediately because of spark lazy evaluation
scala> val crossJoinDFPartitions = crossJoinDF.rdd.partitions.size
Run Code Online (Sandbox Code Playgroud)
检查交叉连接数据帧上的分区数。如果 crossJoinDFPartitions > 10,000,那么您确实遇到了同样的问题,即交叉连接的数据框有太多分区。
为了使交叉连接数据帧上的操作更快,请减少输入数据帧上的分区数量。例如:
scala> val ds1 = ds1.repartition(40)
scala> ds1.rdd.partitions.size
res80: Int = 40
scala> val ds2 = ds2.repartition(40)
scala> ds2.rdd.partitions.size
res81: Int = 40
scala> val crossJoinDF = ds1.crossJoin(ds2)
scala> crossJoinDF.rdd.partitions.size
res82: Int = 1600
scala> crossJoinDF.count()
Run Code Online (Sandbox Code Playgroud)
该count()操作应该导致交叉连接的执行。现在计数应该会在合理的时间内返回。您选择的确切分区数量取决于集群中可用的核心数量。
这里的关键要点是确保交叉连接的数据框具有合理数量的分区(<< 10,000)。您可能还会发现这篇文章很有用,它更详细地解释了这个问题。
| 归档时间: |
|
| 查看次数: |
4258 次 |
| 最近记录: |