Dmi*_*uev 5 elasticsearch apache-spark
因此,我正在学习通过Apache Spark从ElasticSearch中获取数据。假设我已连接到具有“用户”索引的ElasticSearch。
sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')
Run Code Online (Sandbox Code Playgroud)
说明(usersES)向我展示了这一点:
==身体计划==
扫描ElasticsearchRelation(Map(es.nodes-> mynode,es.resource-> users / user),org.apache.spark.sql.SQLContext @ 6c78e806,None)[about#145,activities#146,bdate#147,uid #148]
当我使用过滤器时:
usersES.filter(usersES.uid==1566324).explain()
Run Code Online (Sandbox Code Playgroud)
==物理计划==过滤器(uid#203L = 1566324)+-扫描ElasticsearchRelation(Map(es.nodes-> mynode,es.resource-> users / user),org.apache.spark.sql.SQLContext @ 6c78e806,无)[大约#145,活动#146,约会(bdate)#147,uid#148] PushedFilters:[EqualTo(uid,1566324)]
如您所见,Spark优雅地将过滤器推送到ElasticSearch,使索引搜索既快速又舒适。
但是,当我尝试将usersES与另一个数据帧结合在一起时,总是会遇到相同的问题: Spark会扫描整个ElasticSearch索引,而不是推送我提供的任何过滤器。 例如:
a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()
Run Code Online (Sandbox Code Playgroud)
显示:
SortMergeJoin [id#210L],[uid#203L]:-排序[id#210L ASC],false,0:+-TungstenExchange hashpartitioning(id#210L,200),无:+-ConvertToUnsafe:+-扫描ExistingRDD [id #210L] +-排序[uid#203L ASC],false,0 +-TungstenExchange hashpartitioning(uid#203L,200),无+-ConvertToUnsafe +-扫描ElasticsearchRelation(Map(es.nodes-> mynode,es.resource- >用户/用户),org.apache.spark.sql.SQLContext @ 6c78e806,无)[关于#145,活动#146,生日日期147,uid#148]
请告诉我,是否可以在联接内部的Elasticsearch中推送过滤器?
这是预期的行为,是的,elaticsearch-hadoop 连接器支持下推谓词,但当您加入时没有推送。
这是因为连接操作不知道数据帧中键的分区方式。
默认情况下,此操作将对两个数据帧的所有键进行哈希处理,将具有相同键哈希的所有元素通过网络发送到同一台机器,然后在该机器上将具有相同键的元素连接在一起。
这就是为什么您可以在没有谓词被下推的情况下获得执行计划。
编辑:连接器似乎从 2.1 版开始就支持IN子句。如果您的 DataFrame a 不大,您应该使用它。
| 归档时间: |
|
| 查看次数: |
834 次 |
| 最近记录: |