dim*_*zak 5 elasticsearch apache-spark pyspark elasticsearch-hadoop
我不知道如何使用 Spark 中的 python 将数据帧写入 Elasticsearch。我从这里开始遵循步骤。
这是我的代码:
# Read file
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='true') \
.load('/vagrant/data/input/input.csv', schema = customSchema)
df.registerTempTable("data")
# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")
es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
Run Code Online (Sandbox Code Playgroud)
上面的代码给出了
原因:net.razorvine.pickle.PickleException:构造 ClassDict 时预期参数为零(对于 pyspark.sql.types._create_row)
我还从以下位置启动了脚本:
spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py以确保elasticsearch-hadoop已加载
对于初学者来说saveAsNewAPIHadoopFile,期望是成对RDD的(key, value),而在您的情况下,这可能只是偶然发生。同样的事情也适用于您声明的值格式。
我不熟悉 Elastic,但根据论点,您可能应该尝试类似的操作:
kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)
由于 Elastic-Hadoop 提供 SQL 数据源,您还应该能够跳过它并直接保存数据:
df.write.format("org.elasticsearch.spark.sql").save(...)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3664 次 |
| 最近记录: |