如何使用Pyspark和Dataframes查询Elasticsearch索引

Geo*_*kis 7 dataframe elasticsearch pyspark

Elasticsaerch的文档仅涵盖向Spark加载完整索引.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()
Run Code Online (Sandbox Code Playgroud)

如何使用pyspark执行查询以从Elasticsearch索引返回数据并将其作为DataFrame加载到Spark?

Eya*_*ari 5

以下是我的工作方式.

常规环境设置和命令:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar
Run Code Online (Sandbox Code Playgroud)

码:

from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "filtered": {
      "filter": {
        "exists": {
          "field": "label"
        }
      },
      "query": {
        "match_all": {}
      }
    }
  }
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "titanic/passenger",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()
Run Code Online (Sandbox Code Playgroud)

您还可以定义数据框列.有关详细信息,请参阅此处

希望它有所帮助!