我有一个从RDD创建的数据集,并尝试将它与另一个从我的Phoenix 表创建的数据集连接起来:
val dfToJoin = sparkSession.createDataset(rddToJoin)
val tableDf = sparkSession
.read
.option("table", "table")
.option("zkURL", "localhost")
.format("org.apache.phoenix.spark")
.load()
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")
Run Code Online (Sandbox Code Playgroud)
当我执行它时,似乎整个数据库表都被加载来进行连接。
有没有办法进行这样的连接,以便在数据库上而不是在 spark 上完成过滤?
另外:dfToJoin比表小,我不知道这是否重要。
编辑:基本上我想将我的 Phoenix 表与通过 spark 创建的数据集连接起来,而不是将整个表提取到执行程序中。
Edit2:这是物理计划:
*Project [FEATURE#21, SEQUENCE_IDENTIFIER#22, TAX_NUMBER#23,
WINDOW_NUMBER#24, uniqueIdentifier#5, readLength#6]
+- *SortMergeJoin [FEATURE#21], [feature#4], Inner
:- *Sort [FEATURE#21 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(FEATURE#21, 200)
: +- *Filter isnotnull(FEATURE#21)
: +- *Scan PhoenixRelation(FEATURES,localhost,false)
[FEATURE#21,SEQUENCE_IDENTIFIER#22,TAX_NUMBER#23,WINDOW_NUMBER#24]
PushedFilters: [IsNotNull(FEATURE)], ReadSchema: …Run Code Online (Sandbox Code Playgroud)