Spark:使用ElasticSearch索引优化联接

Dmi*_*uev 5 elasticsearch apache-spark

因此,我正在学习通过Apache Spark从ElasticSearch中获取数据。假设我已连接到具有“用户”索引的ElasticSearch。

sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')
Run Code Online (Sandbox Code Playgroud)

说明(usersES)向我展示了这一点:

==身体计划==

扫描ElasticsearchRelation(Map(es.nodes-> mynode,es.resource-> users / user),org.apache.spark.sql.SQLContext @ 6c78e806,None)[about#145,activities#146,bdate#147,uid #148]

当我使用过滤器时:

usersES.filter(usersES.uid==1566324).explain()
Run Code Online (Sandbox Code Playgroud)

==物理计划==过滤器(uid#203L = 1566324)+-扫描ElasticsearchRelation(Map(es.nodes-> mynode,es.resource-> users / user),org.apache.spark.sql.SQLContext @ 6c78e806,无)[大约#145,活动#146,约会(bdate)#147,uid#148] PushedFilters:[EqualTo(uid,1566324)]

如您所见,Spark优雅地将过滤器推送到ElasticSearch,使索引搜索既快速又舒适。

但是,当我尝试将usersES与另一个数据帧结合在一起时,总是会遇到相同的问题: Spark会扫描整个ElasticSearch索引,而不是推送我提供的任何过滤器。 例如:

a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()
Run Code Online (Sandbox Code Playgroud)

显示:

SortMergeJoin [id#210L],[uid#203L]:-排序[id#210L ASC],false,0:+-TungstenExchange hashpartitioning(id#210L,200),无:+-ConvertToUnsafe:+-扫描ExistingRDD [id #210L] +-排序[uid#203L ASC],false,0 +-TungstenExchange hashpartitioning(uid#203L,200),无+-ConvertToUnsafe +-扫描ElasticsearchRelation(Map(es.nodes-> mynode,es.resource- >用户/用户),org.apache.spark.sql.SQLContext @ 6c78e806,无)[关于#145,活动#146,生日日期147,uid#148]

请告诉我,是否可以在联接内部的Elasticsearch中推送过滤器?

eli*_*sah 4

这是预期的行为,是的,elaticsearch-hadoop 连接器支持下推谓词,但当您加入时没有推送。

这是因为连接操作不知道数据帧中键的分区方式。

默认情况下,此操作将对两个数据帧的所有键进行哈希处理,将具有相同键哈希的所有元素通过网络发送到同一台机器,然后在该机器上将具有相同键的元素连接在一起。

这就是为什么您可以在没有谓词被下推的情况下获得执行计划。

编辑:连接器似乎从 2.1 版开始就支持IN子句。如果您的 DataFrame a 不大,您应该使用它。