zet*_*ime 7 join bucket apache-spark hive-metastore
在测试生产用例时,我创建并保存了(使用Hive Metastore)这样的表:
table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets
table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
Run Code Online (Sandbox Code Playgroud)
我正在运行这样的查询(以伪代码)
table1.join(table2, [“key1”, “key2”])
.groupBy(“value2”)
.countUnique(“key1”)
Run Code Online (Sandbox Code Playgroud)
常识说,这种连接应该简单地通过没有任何交换的排序合并连接来完成。但是spark进行了交流然后加入。
即使对于这个特定的用例,我也可以按两个键进行存储,由于其他一些用例,我需要按key1进行存储。当我使用这样的单个键进行(更简单的)连接时:
table1.join(table2, [“key1”])
Run Code Online (Sandbox Code Playgroud)
它按预期方式工作(即不进行任何排序的合并合并)。
现在,如果要过滤,我对这些表进行了优化联接,如下所示:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
Run Code Online (Sandbox Code Playgroud)
它恢复为交换,然后加入。
当联接密钥是bucketBy密钥的超集时,如何说服火花不要进行交换?
注意:
我知道的一个技巧是,如果我将不等式检查改写为等式检查,则火花不会洗牌。
(x == y)也可以表示为((x> = y)&(x <= y))。如果我在上一个示例中应用了两个这样的过滤器:
.filter(table1.col(“ key2”)> = table2.col(“ key2”))
.filter(table1.col(“ key2”)<= table2.col(“ key2”))
它将继续使用sort-merge join,而不会进行交换,但这不是解决方案,这是一个hack。
根据一些研究和探索,这似乎是最简单的解决方案:
\n\n在此示例的基础上:
\n\ntable1.join(table2, [\xe2\x80\x9ckey1\xe2\x80\x9d])\n .filter(table1.col(\xe2\x80\x9ckey2\xe2\x80\x9d) == table2.col(\xe2\x80\x9ckey2\xe2\x80\x9d))\nRun Code Online (Sandbox Code Playgroud)\n\n不使用equalTo (==)Spark,而是实现自定义MyEqualTo(通过委托给 SparkEqualTo实现就可以了)似乎可以解决问题。这样,spark 不会优化(!)连接,它只会将过滤器拉入 SortMergeJoin 中。
类似地,连接条件也可以这样形成:
\n\n(table1.col(\xe2\x80\x9ckey1\xe2\x80\x9d) == table2.col(\xe2\x80\x9ckey1\xe2\x80\x9d)) AND\ntable1.col(\xe2\x80\x9ckey2\xe2\x80\x9d).myEqualTo(table2.col(\xe2\x80\x9ckey2\xe2\x80\x9d))\nRun Code Online (Sandbox Code Playgroud)\n