相关疑难解决方法(0)

Python Spark Dataframe 到 Elasticsearch

我不知道如何使用 Spark 中的 python 将数据帧写入 Elasticsearch。我从这里开始遵循步骤。

这是我的代码:

# Read file
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load('/vagrant/data/input/input.csv', schema = customSchema)

df.registerTempTable("data")

# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")

es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_conf)
Run Code Online (Sandbox Code Playgroud)

上面的代码给出了

原因:net.razorvine.pickle.PickleException:构造 ClassDict 时预期参数为零(对于 pyspark.sql.types._create_row)

我还从以下位置启动了脚本: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py以确保elasticsearch-hadoop已加载

elasticsearch apache-spark pyspark elasticsearch-hadoop

5
推荐指数
1
解决办法
3664
查看次数