如何在Spark中加入Elasticsearch?

Ter*_*ran 5 hadoop elasticsearch apache-spark pyspark

使用HTTP POST,以下脚本可以插入新字段createtime或更新lastupdatetime

curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
    "lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
    "createtime": "2015-09-16T18:00:00"
    "lastupdatetime": "2015-09-16T18:00",
}
}'
Run Code Online (Sandbox Code Playgroud)

但是在spark脚本中,设置后"es.write.operation": "upsert",我根本不知道如何插入createtime。只有es.update.script.*正式文件。所以,任何人都可以给我一个例子吗?

更新:就我而言,我想将Android设备的信息从日志中保存为一种 Elasticsearch类型,并将其首次出现时间设置为createtime。如果该设备再次出现,则我只会更新lastupdatetime,而保持createtime原样。

所以文档id是android ID,如果id存在,则update lastupdatetime,否则insert createtimelastupdatetime.So这里的设置是(在python中):

conf = {
    "es.resource.write": "stats-device/activation",
    "es.nodes": "NODE1:9200",
    "es.write.operation": "upsert",
    "es.mapping.id": "id"
    # ???
}

rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=conf
)
Run Code Online (Sandbox Code Playgroud)

我只是不知道如何在不存在的情况下插入字段id

Ter*_*ran 1

最后我得到了一个并不完美的解决方案:

  1. 添加createtime到所有源文档;
  2. 使用create方法保存到 es 并忽略已经创建的错误;
  3. 删除createtime字段;
  4. 用方法再次保存到es update

现在(2015-09-27),步骤2可以通过这个补丁来实现。