Elasticsearch 无法将来自 pymongo 的日期时间字段解析为对象

mar*_*tin 2 python mongodb pymongo elasticsearch

我试图用流无论从MongoDB中到Elasticsearch数据pymongo和Python客户端elasticsearch

我已经设置了一个映射,我在这里报告了与感兴趣的领域相关的片段:

"updated_at": { "type": "date", "format": "dateOptionalTime" }

我的脚本使用 pymongo 从 MongoDB 中获取每个文档,并尝试将其索引到 Elasticsearch 中

from elasticsearch import Elasticsearch
from pymongo import MongoClient

mongo_client = MongoClient('localhost', 27017)
es_client = Elasticsearch(hosts=[{"host": "localhost", "port": 9200}])
db = mongo_client['my_db']
collection = db['my_collection']

for doc in collection.find():
    es_client.index(
         index='index_name', 
         doc_type='my_type', 
         id=str(doc['_id']), 
         body=json.dumps(doc, default=json_util.default)
    )
Run Code Online (Sandbox Code Playgroud)

我在运行它时遇到的问题是:

elasticsearch.exceptions.RequestError: TransportError(400, u'MapperParsingException[无法解析 [updated_at]]; 嵌套: ElasticsearchIllegalArgumentException[未知属性 [$date]];')

我相信问题的根源在于 pymongo 将字段updated_at序列化为datetime.datetime 对象,因为我可以看到是否在 for 循环中打印文档:

u'updated_at': datetime.datetime(2014, 8, 31, 17, 18, 13, 17000)

这与 Elasticsearch 查找映射中指定的日期类型的对象相冲突。

任何想法如何解决这个问题?

Val*_*Val 5

您走对了路,您的 Pythondatetime需要序列化为符合 ISO 8601 的日期字符串。因此,您需要CustomEncoderjson.dumps()通话中添加一个。首先,将您声明CustomEncoderJSONEncoder将处理datetimetime属性转换的子类,但将其余部分委托给其超类:

class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.strftime('%Y-%m-%dT%H:%M:%S%z')
        if isinstance(obj, time):
            return obj.strftime('%H:%M:%S')
        if hasattr(obj, 'to_json'):
            return obj.to_json()
        return super(CustomEncoder, self).default(obj)
Run Code Online (Sandbox Code Playgroud)

然后您可以在json.dumps通话中使用它,如下所示:

...
body=json.dumps(doc, default=json_util.default, cls=CustomEncoder)
...
Run Code Online (Sandbox Code Playgroud)