连续迭代mongodb游标(在移动到下一个文档之前等待回调)

UpT*_*eek 50 mongodb node.js mongoskin async.js

使用mongoskin,我可以执行这样的查询,它将返回一个游标:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

      }
}
Run Code Online (Sandbox Code Playgroud)

但是,我想为每个文档调用一些异步函数,并且只有在调用它之后才转到光标上的下一个项目(类似于async.js模块中的eachSeries结构).例如:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

            externalAsyncFunction(result, function(err) {
               //externalAsyncFunction completed - now want to move to next doc
            });

      }
}  
Run Code Online (Sandbox Code Playgroud)

我怎么能这样做?

谢谢

更新:

我不想使用,toArray()因为这是一个大批量操作,结果可能不会一次性适合内存.

Tim*_*ple 50

如果您不想使用toArray将所有结果加载到内存中,则可以使用以下类似的方法迭代光标.

myCollection.find({}, function(err, resultCursor) {
  function processItem(err, item) {
    if(item === null) {
      return; // All done!
    }

    externalAsyncFunction(item, function(err) {
      resultCursor.nextObject(processItem);
    });

  }

  resultCursor.nextObject(processItem);
}  
Run Code Online (Sandbox Code Playgroud)

  • 对于大型数据集,此方法对我不起作用.我得到"RangeError:超出最大调用堆栈大小". (11认同)
  • @SoichiHayashi跟进@zamnuts - 你的堆栈溢出上面的例子是因为每次你处理一个项目,你运行另一个回调来处理当前一个*的处理函数中的下一个项目*.随着结果集的增长,您将循环执行更多函数调用,并且每个函数调用都会在前一个函数调用之上创建一个新的堆栈帧.在`process.nextTick`,`setImmediate`或`setTimeout`中包装异步回调会导致它在我们为处理每个文档而创建的调用堆栈的"外部"的下一个循环中运行. (4认同)
  • 那么`cursor.forEach()`呢? (3认同)
  • @SoichiHayashi在`process.nextTick`中包装异步函数或回调! (2认同)
  • @Redsandro - cursor.forEach() 没有提供异步方式来通知移动到下一个项目。 (2认同)

小智 49

一种更现代的方法,使用async/ await:

const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
  const doc = await cursor.next();
  // process doc here
}
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 异步迭代器到达时,这可能简单.
  • 您可能希望添加try/catch以进行错误检查.
  • 包含的函数应该是async或者代码应该包含在(async function() { ... })()它使用之后await.
  • 如果需要,await new Promise(resolve => setTimeout(resolve, 1000));在while循环结束时添加(暂停1秒)以显示它一个接一个地处理文档.

  • 很棒,这是最好的解决方案,不像选择的会崩溃 (4认同)
  • @Nico,对于迟到的回复很抱歉,但请注意注释中的第3点;) (2认同)

Jay*_*nki 20

从 node.js v10.3 开始,您可以使用异步迭代器

const cursor = db.collection('foo').find({});
for await (const doc of cursor) {
  // do your thing
  // you can even use `await myAsyncOperation()` here
}
Run Code Online (Sandbox Code Playgroud)

Jake Archibald 写了一篇关于异步迭代器的很棒的博客文章,我是在阅读@user993683 的回答后才知道的。


Dap*_*que 10

这适用于使用setImmediate的大型数据集:

var cursor = collection.find({filter...}).cursor();

cursor.nextObject(function fn(err, item) {
    if (err || !item) return;

    setImmediate(fnAction, item, arg1, arg2, function() {
        cursor.nextObject(fn);
    });
});

function fnAction(item, arg1, arg2, callback) {
    // Here you can do whatever you want to do with your item.
    return callback();
}
Run Code Online (Sandbox Code Playgroud)