Geo*_*kis 7 dataframe elasticsearch pyspark
Elasticsaerch的文档仅涵盖向Spark加载完整索引.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()
Run Code Online (Sandbox Code Playgroud)
如何使用pyspark执行查询以从Elasticsearch索引返回数据并将其作为DataFrame加载到Spark?
以下是我的工作方式.
常规环境设置和命令:
export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2
./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar
Run Code Online (Sandbox Code Playgroud)
码:
from pyspark import SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
q ="""{
"query": {
"filtered": {
"filter": {
"exists": {
"field": "label"
}
},
"query": {
"match_all": {}
}
}
}
}"""
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "titanic/passenger",
"es.query" : q
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
sqlContext.createDataFrame(es_rdd).collect()
Run Code Online (Sandbox Code Playgroud)
您还可以定义数据框列.有关详细信息,请参阅此处
希望它有所帮助!
| 归档时间: |
|
| 查看次数: |
6496 次 |
| 最近记录: |