当联接密钥是bucketBy密钥的超集时,如何说服火花不要进行交换?

zet*_*ime 7 join bucket apache-spark hive-metastore

在测试生产用例时,我创建并保存了(使用Hive Metastore)这样的表:

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
Run Code Online (Sandbox Code Playgroud)

我正在运行这样的查询(以伪代码)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)
Run Code Online (Sandbox Code Playgroud)

常识说,这种连接应该简单地通过没有任何交换的排序合并连接来完成。但是spark进行了交流然后加入。

即使对于这个特定的用例,我也可以按两个键进行存储,由于其他一些用例,我需要按key1进行存储。当我使用这样的单个键进行(更简单的)连接时:

table1.join(table2, [“key1”])
Run Code Online (Sandbox Code Playgroud)

它按预期方式工作(即不进行任何排序的合并合并)。

现在,如果要过滤,我对这些表进行了优化联接,如下所示:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))
Run Code Online (Sandbox Code Playgroud)

它恢复为交换,然后加入。

当联接密钥是bucketBy密钥的超集时,如何说服火花不要进行交换?

注意:

我知道的一个技巧是,如果我将不等式检查改写为等式检查,则火花不会洗牌。

(x == y)也可以表示为((x> = y)&(x <= y))。如果我在上一个示例中应用了两个这样的过滤器:

.filter(table1.col(“ key2”)> = table2.col(“ key2”))

.filter(table1.col(“ key2”)<= table2.col(“ key2”))

它将继续使用sort-merge join,而不会进行交换,但这不是解决方案,这是一个hack。

zet*_*ime 5

根据一些研究和探索,这似乎是最简单的解决方案:

\n\n

在此示例的基础上:

\n\n
table1.join(table2, [\xe2\x80\x9ckey1\xe2\x80\x9d])\n      .filter(table1.col(\xe2\x80\x9ckey2\xe2\x80\x9d) == table2.col(\xe2\x80\x9ckey2\xe2\x80\x9d))\n
Run Code Online (Sandbox Code Playgroud)\n\n

不使用equalTo (==)Spark,而是实现自定义MyEqualTo(通过委托给 SparkEqualTo实现就可以了)似乎可以解决问题。这样,spark 不会优化(!)连接,它只会将过滤器拉入 SortMergeJoin 中。

\n\n

类似地,连接条件也可以这样形成:

\n\n
(table1.col(\xe2\x80\x9ckey1\xe2\x80\x9d) == table2.col(\xe2\x80\x9ckey1\xe2\x80\x9d)) AND\ntable1.col(\xe2\x80\x9ckey2\xe2\x80\x9d).myEqualTo(table2.col(\xe2\x80\x9ckey2\xe2\x80\x9d))\n
Run Code Online (Sandbox Code Playgroud)\n

  • 我猜这会触发列反序列化为 jvm 对象?即我们必须在洗牌的痛苦或增加的反/序列化和垃圾收集之间做出选择? (2认同)