mar*_*tin 2 python mongodb pymongo elasticsearch
我试图用流无论从MongoDB中到Elasticsearch数据pymongo和Python客户端elasticsearch。
我已经设置了一个映射,我在这里报告了与感兴趣的领域相关的片段:
"updated_at": { "type": "date", "format": "dateOptionalTime" }
我的脚本使用 pymongo 从 MongoDB 中获取每个文档,并尝试将其索引到 Elasticsearch 中
from elasticsearch import Elasticsearch
from pymongo import MongoClient
mongo_client = MongoClient('localhost', 27017)
es_client = Elasticsearch(hosts=[{"host": "localhost", "port": 9200}])
db = mongo_client['my_db']
collection = db['my_collection']
for doc in collection.find():
es_client.index(
index='index_name',
doc_type='my_type',
id=str(doc['_id']),
body=json.dumps(doc, default=json_util.default)
)
Run Code Online (Sandbox Code Playgroud)
我在运行它时遇到的问题是:
elasticsearch.exceptions.RequestError: TransportError(400, u'MapperParsingException[无法解析 [updated_at]]; 嵌套: ElasticsearchIllegalArgumentException[未知属性 [$date]];')
我相信问题的根源在于 pymongo 将字段updated_at序列化为datetime.datetime 对象,因为我可以看到是否在 for 循环中打印文档:
u'updated_at': datetime.datetime(2014, 8, 31, 17, 18, 13, 17000)
这与 Elasticsearch 查找映射中指定的日期类型的对象相冲突。
任何想法如何解决这个问题?
您走对了路,您的 Pythondatetime需要序列化为符合 ISO 8601 的日期字符串。因此,您需要CustomEncoder在json.dumps()通话中添加一个。首先,将您声明CustomEncoder为JSONEncoder将处理datetime和time属性转换的子类,但将其余部分委托给其超类:
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%dT%H:%M:%S%z')
if isinstance(obj, time):
return obj.strftime('%H:%M:%S')
if hasattr(obj, 'to_json'):
return obj.to_json()
return super(CustomEncoder, self).default(obj)
Run Code Online (Sandbox Code Playgroud)
然后您可以在json.dumps通话中使用它,如下所示:
...
body=json.dumps(doc, default=json_util.default, cls=CustomEncoder)
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
11782 次 |
| 最近记录: |