将 elasticsearch 2.x 转储到 mongodb 并返回 ES 6.x

pra*_*ase 5 ruby elasticsearch sidekiq

这个问题更多的是理论而不是源代码。

我有一个 ES 2.x 节点,它有超过 1.2TB 的数据。我们有 40 多个索引,每个索引至少有 1 种类型。在这里,ES 2.x 被用作数据库而不是搜索引擎。用于将数据转储到 ES 2.x 的源丢失了。此外,数据未规范化,但单个 ES 文档具有多个嵌入文档。我们的目标是重新创建数据源,同时对其进行规范化。

我们正在计划的是:

  1. 从 ES 中检索数据,对其进行分析并将其转储到新的 mongodb 中到特定集合并维护数据之间的关系。IE。以标准化形式保存。
  2. 在新的 ES 6 节点上索引新的 mongo 数据。

我们使用的是 JRuby 9.1.15.0、Rails 5、Ruby 2.4 和 Sidekiq。

目前,我们正在从 ES 检索特定日期时间范围的数据。有时我们会收到 0 条记录,有时会收到 100000+ 条记录。问题是当我们收到大量记录时。

下面是一个示例脚本,当日期范围的数据很小时可以工作,但在数据很大时会失败。1.2TB/40 索引是平均索引大小

class DataRetrieverWorker
  include Sidekiq::Worker
  include Sidekiq::Status::Worker

  def perform(indx_name, interval = 24, start_time = nil, end_time = nil)
    unless start_time || end_time
      client = ElasticSearchClient.instance.client
      last_retrieved_at = RetrievedIndex.where(name: indx_name).desc(:created_at).first
      start_time, end_time = unless last_retrieved_at
                               data = client.search index: indx_name, size: 1, sort: [{ insert_time: { order: 'asc' } }]
                               first_day = DateTime.parse(data['hits']['hits'].first['_source']['insert_time'])
                               start_time = first_day.beginning_of_day
                               end_time = first_day.end_of_day
                             else
                               # retrieve for the next time slot. usually 24 hrs.
                               [last_retrieved_at.end_time, last_retrieved_at.end_time + interval.hours]
                             end
      DataRetrieverWorker.perform_async(indx_name, interval, start_time, end_time)
    else
       # start scroll on the specified range and retrieve data.
       query = { range: { insert_time: { gt: DateTime.parse(start_time).utc.iso8601, lt: DateTime.parse(end_time).utc.iso8601 } } }
       data = client.search index: indx_name, scroll: '10m', size: SCROLL_SIZE, body: { query: query }
      ri = RetrievedIndex.find_by(name: indx_name, start_time: start_time, end_time: end_time)
      if ri
        DataRetrieverWorker.perform_at(2.seconds.from_now, indx_name, interval)
        return
      end
      ri = RetrievedIndex.create!(name: indx_name, start_time: start_time, end_time: end_time, documents_cnt: data['hits']['total'])
      if data['hits']['total'] > 0
        if data['hits']['total'] > 2000
          BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
          while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
            BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
          end
        else
          data['hits']['hits'].each do |r|
            schedule(r)
            ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
          end
          while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
            data['hits']['hits'].each do |r|
              schedule(r)
              ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
            end
          end
        end
      else
        DataRetrieverWorker.perform_async(indx_name, interval)
        return
      end
      DataRetrieverWorker.perform_at(indx_name, interval)
    end
  end

  private

  def schedule(data)
    DataPersisterWorker.perform_async(data)
  end
end
Run Code Online (Sandbox Code Playgroud)

问题:

  1. 从 ES 2.x 检索数据的理想方法应该是什么?我们通过日期范围检索数据,然后使用滚动 api 检索结果集。这是正确的吗?
  2. 当我们在特定时间范围内获得大结果时应该做什么。有时,我们会在几分钟的时间范围内获得 20000 多条记录。理想的方法应该是什么?
  3. sidekiq 是处理如此大量数据的正确库吗?
  4. 运行 sidekiq 的服务器的理想配置应该是什么?
  5. 使用日期范围是检索数据的正确方法吗?文件的数量变化很大。0 或 100000+。
  6. 有没有更好的方法可以给我 uinform 的记录数量,而不管时间范围如何?
  7. 我尝试使用独立于时间范围的滚动 api,但是对于具有 100cr 记录的索引,使用大小为 100 的滚动是否正确(调用 ES 的 api 有 100 个结果)?8.指数中的数据不断增加。没有任何文件被更新。

我们已经测试了我们的代码,它处理每个日期时间范围(比如 6 小时)的名义数据(比如 4-5k 文档)。我们还计划对数据进行分片。由于我们需要在某些集合中添加/更新记录时执行一些 ruby​​ 回调,因此我们将使用 Mongoid。在没有 mongoid 的情况下直接在 mongodb 中插入数据不是一种选择。

任何指针都会有所帮助。谢谢。

小智 0

  1. 从 ES 2.x 检索数据的理想方法应该是什么?我们通过日期范围检索数据,然后使用滚动 api 检索结果集。这是正确的吗?

ES中的数据是不断增加的吗?

  1. 当我们在特定时间范围内获得大量结果时应该做什么。有时,我们会在几分钟的时间范围内获得 20000 多条记录。理想的方法应该是什么?

您正在使用滚动 api,这是一个很好的方法。你可以尝试一下ES的Sliced Scroll API。

  1. sidekiq 是处理如此大量数据的合适库吗?

是的,sidekiq 很好,可以处理这么多数据。

  1. 运行 sidekiq 的服务器的理想配置应该是什么?

您当前运行 sidekiq 的服务器配置是什么?

  1. 使用日期范围是检索数据的正确方法吗?文件的数量差异很大。0 或 100000+。

您一次无法保存 100000 多个结果。您正在使用滚动 API 分块处理它们。如果 ES 中没有继续添加数据,则使用带有match_all: {}滚动 api 的查询。如果不断添加数据,那么日期范围是很好的方法。

  1. 有没有更好的方法可以为我提供统一数量的记录,无论时间范围如何?

是的,如果您在不使用日期范围的情况下使用。使用scroll api 扫描从0 到最后的所有文档。

  1. 我尝试独立于时间范围使用滚动 api,但是对于具有 100cr 记录的索引,使用大小为 100 的滚动是否正确(对 ES 的 api 调用有 100 个结果)?

您可以增加滚动大小,因为 mongodb 支持批量插入文档。 MongoDB 批量插入

以下几点可能会解决您的问题:

处理前一批后清除scroll_id可能会提高性能。

  1. 滚动请求进行了优化,当排序顺序为 _doc 时,滚动请求的速度会更快。如果您想迭代所有文档而不考虑顺序,这是最有效的选择。

  2. 滚动参数告诉 Elasticsearch 应该使搜索上下文保持活动状态多长时间。它的值(例如1m)不需要足够长来处理所有数据,它只需要足够长来处理上一批结果。每个滚动请求都会设置一个新的到期时间。

  3. 当超过滚动超时时,搜索上下文将自动删除。然而,保持滚动打开是有成本的(稍后在性能部分讨论),因此一旦不再使用滚动,就应该使用clear-scroll API显式清除滚动:

  4. Scroll API :后台合并过程通过将较小的段合并在一起以创建新的更大的段来优化索引,此时较小的段将被删除。此过程在滚动期间继续,但开放的搜索上下文可防止旧段在仍在使用时被删除。这就是 Elasticsearch 能够返回初始搜索请求的结果,而不管文档的后续更改如何。保持较旧的段处于活动状态意味着需要更多的文件句柄。确保节点已配置为具有足够的可用文件句柄,并且在数据获取后立即清除滚动 API 上下文。我们可以使用节点统计 API 检查有多少搜索上下文打开:

因此,清除 Scroll API 上下文是非常有必要的,如前面“清除 Scroll API”部分所述。

来源