如何等待所有批量写入在弹性搜索API中完成

Ric*_*d G 6 elasticsearch

使用NodeJS弹性搜索客户端.尝试编写数据导入程序以批量导入MongoDB中的文档.我遇到的问题是索引刷新似乎不等到所有文档在检查计数之前写入弹性.

使用节点中的流API将记录读入批处理,然后使用弹性API批处理命令写入记录.如下所示:

function rebuildIndex(modelName, queryStream, openStream, done) {
    logger.debug('Rebuilding %s index', modelName);
    async.series([
        function (next) {
          deleteType(modelName, function (err, result) {
            next(err, result);
          });
        },
        function (next) {
          var Model;
          var i = 0;
          var batchSize = settings.indexBatchSize;
          var batch = [];
          var stream;

          if (queryStream && !openStream) {
            stream = queryStream.stream();
          } else if (queryStream && openStream) {
            stream = queryStream;
          }else
          {
            Model = mongoose.model(modelName);
            stream = Model.find({}).stream();
          }

          stream.on("data", function (doc) {
            logger.debug('indexing %s', doc.userType);
            batch.push({
              index: {
                "_index": settings.index,
                "_type": modelName.toLowerCase(),
                "_id": doc._id.toString()
              }
            });
            var obj;
            if (doc.toObject){
              obj = doc.toObject();
            }else{
              obj = doc;
            }
            obj = _.clone(obj);

            delete obj._id;
            batch.push(obj);
            i++;
            if (i % batchSize == 0) {
              console.log(chalk.green('Loaded %s records'), i);
              client().bulk({
                body: batch
              }, function (err, resp) {
                if (err) {
                  next(err);
                } else if (resp.errors) {
                  next(resp);
                }
              });
              batch = [];
            }
          });

          // When the stream ends write the remaining records
          stream.on("end", function () {
            if (batch.length > 0) {
              console.log(chalk.green('Loaded %s records'), batch.length / 2);
              client().bulk({
                body: batch
              }, function (err, resp) {
                if (err) {
                  logger.error(err, 'Failed to rebuild index');
                  next(err);
                } else if (resp.errors) {
                  logger.error(resp.errors, 'Failed to rebuild index');
                  next(resp);
                } else {
                  logger.debug('Completed rebuild of %s index', modelName);
                  next();
                }
              });
            } else {
              next();
            }

            batch = [];
          })
        }

      ],
      function (err) {
        if (err)
          logger.error(err);
        done(err);
      }
    );
  }
Run Code Online (Sandbox Code Playgroud)

我使用这个助手来检查索引中的文档计数.没有超时,索引中的计数是错误的,但是超时时它们没问题.

/**
   * A helper function to count the number of documents in the search index for a particular type.
   * @param type The type, e.g. User, Customer etc.
   * @param done A callback to report the count.
   */
  function checkCount(type, done) {
    async.series([
      function(next){
        setTimeout(next, 1500);
      },
      function (next) {
        refreshIndex(next);
      },
      function (next) {
        client().count({
          "index": settings.index,
          "type": type.toLowerCase(),
          "ignore": [404]
        }, function (error, count) {
          if (error) {
            next(error);
          } else {
            next(error, count.count);
          }
        });
      }
    ], function (err, count) {
      if (err)
        logger.error({"err": err}, "Could not check index counts.");
      done(err, count[2]);
    });
  }
Run Code Online (Sandbox Code Playgroud)

这个助手应该在更新完成后刷新索引:

// required to get results to show up immediately in tests. Otherwise there's a 1 second delay
  // between adding an entry and it showing up in a search.
  function refreshIndex(done) {
    client().indices.refresh({
      "index": settings.index,
      "ignore": [404]
    }, function (error, response) {
      if (error) {
        done(error);
      } else {
        logger.debug("deleted index");
        done();
      }
    });
  }
Run Code Online (Sandbox Code Playgroud)

加载器工作正常,但由于批量加载和计数检查之间的时间因此测试失败:

it('should be able to rebuild and reindex customer data', function (done) {
    this.timeout(0); // otherwise the stream reports a timeout error
    logger.debug("Testing the customer reindexing process");

    // pass null to use the generic find all query
    searchUtils.rebuildIndex("Customer", queryStream, false, function () {
      searchUtils.checkCount("Customer", function (err, count) {
        th.checkSystemErrors(err, count);
        count.should.equal(volume.totalCustomers);
        done();
      })
    });
  });
Run Code Online (Sandbox Code Playgroud)

我观察了测试计数的随机结果.通过人工延迟(setTimeoutcheckCount函数中),计数匹配.因此,我得出结论,文档最终写成弹性,测试将通过.我认为indices.refresh这本质上会强制等待,直到文档全部写入索引,但它似乎没有使用这种方法.

当卷达到实际生产水平时,setTimeout hack并不是真正可持续的......那么在检查文档数量之前,如何确保批量调用完全写入弹性索引?

Tro*_*roy 6

看看"refresh"参数(elasticsearch文档)

例如:

let bulkUpdatesBody = [ bulk actions / docs to index go here ]
client.bulk({
  refresh: "wait_for",
  body: bulkUpdatesBody
});
Run Code Online (Sandbox Code Playgroud)