具有多线程的 ElasticSearch Scroll API

Alw*_*nny 2 multithreading elasticsearch elasticsearch-py

首先,我想让大家知道我知道ElasticSearch Scroll API如何工作的基本工作逻辑。要使用Scroll API,首先,我们需要使用一些滚动值(如1m )调用search方法,然后它将返回一个_scroll_id,该_scroll_id将用于 Scroll 上的下一个连续调用,直到所有文档在循环中返回。但问题是我只想在多线程的基础上使用相同的进程,而不是串行。例如:

如果我有 300000 个文档,那么我想以这种方式处理/获取文档

  • 第一个线程将处理初始100000 个文档
  • 第二个线程将处理接下来的100000 个文档
  • 第三个线程将处理剩余的100000 个文档

所以我的问题是,我没有找到任何方法来设置滚动 API 上的from值,如何使用线程使滚动过程更快。不要以序列化的方式处理文档。

我的示例 python 代码

if index_name is not None and doc_type is not None and body is not None:
   es = init_es()
   page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
   sid = page['_scroll_id']
   scroll_size = page['hits']['total']

   # Start scrolling
   while (scroll_size > 0):

       print("Scrolling...")
       page = es.scroll(scroll_id=sid, scroll='30s')
       # Update the scroll ID
       sid = page['_scroll_id']

       print("scroll id: " + sid)

       # Get the number of results that we returned in the last scroll
       scroll_size = len(page['hits']['hits'])
       print("scroll size: " + str(scroll_size))

       print("scrolled data :" )
       print(page['aggregations'])
Run Code Online (Sandbox Code Playgroud)

Pau*_*aul 8

你试过切片卷轴吗?根据链接的文档:

对于返回大量文档的滚动查询,可以将滚动拆分为可以独立使用的多个切片。

每个滚动都是独立的,可以像任何滚动请求一样并行处理。

我自己没有使用过这个(我需要处理的最大结果集是 ~50k 文档),但这似乎是你正在寻找的。