如何在NodeJs中使用异步数据源创建可读流?

swb*_*dit 19 javascript database asynchronous node.js express

环境: NodeJS,Express,DynamoDB(但真的可以是任何数据库)

场景: 需要读取大量记录并作为可下载文件返回给用户.这意味着我不能一次缓冲所有内容,然后在Express的响应中发送它.此外,我可能需要多次执行查询,因为在一个查询中可能不会返回所有数据.

建议的解决方案: 使用可以通过管道传输到Express中的响应流的可读流.

我首先创建了一个继承自stream.Readable的对象,并实现了一个推送查询结果的_read()方法.问题是_read()中调用的数据库查询是异步的,但stream.read()是同步方法.

当流通过管道传递给服务器的响应时,在db查询甚至有机会执行之前,会多次调用读取.因此,查询被多次调用,即使查询的第一个实例完成并执行push(null),其他查询也会完成,并且在EOF之后出现"push()"错误.

  1. 有没有办法用_read()正确地做到这一点?
  2. 我应该忘记_read()并只在构造函数中执行查询和push()结果吗?
  3. 我应该执行查询并发出数据事件而不是push()吗?

谢谢

function DynamoDbResultStream(query, options){
    if(!(this instanceof DynamoDbResultStream)){
        return new DynamoDbResultStream(query, options);
    }

    Readable.call(this, options);

    this.dbQuery = query;
    this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);

DynamoDbResultStream.prototype._read = function(){
    var self = this;
    if(!this.done){
        dynamoDB.query(this.dbQuery, function(err, data) {
            if (!err) {
                try{
                    for(i=0;i<data.Items.length;i++){
                        self.push(data.Items[i]);
                    }
                }catch(err){
                    console.log(err);
                }
                if (data.LastEvaluatedKey) {
                    //Next read() should invoke the query with a new start key
                    self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
                }else{
                    self.done=true;
                    self.push(null);
                }
            }else{
                 console.log(err);
                 self.emit('error',err);
            }
        });
    }else{
        self.push(null);
    }
};
Run Code Online (Sandbox Code Playgroud)

编辑: 发布此问题后,我发现这篇文章的答案显示如何在不使用继承的情况下执行此操作:如何在node.js可读流内调用异步函数

在那里发表评论说,在_read()中应该只有一个push().每个push()通常会生成另一个read()调用.

小智 1

注意 Stream 的不同模式:https ://nodejs.org/api/stream.html#stream_two_modes

\n\n
const Readable = require(\'stream\').Readable;\n\n// starts in paused mode\nconst readable = new Readable();\n\nlet i = 0;\nfetchMyAsyncData() {\n  setTimeout(() => {\n    // still remains in paused mode\n    readable.push(++i);\n\n    if (i === 5) {\n      return readable.emit(\'end\');\n    }\n\n    fetchMyAsyncData();\n  }, 500);    \n}\n\n// "The res object is an enhanced version of Node\xe2\x80\x99s own response object and supports all built-in fields and methods."\napp.get(\'/mystreamingresponse\', (req, res) => {\n\n  // remains in paused mode\n  readable.on(\'readable\', () => res.write(readable.read()));\n\n  fetchMyAsyncData();\n\n  // closes the response stream once all external data arrived\n  readable.on(\'end\', () => res.end());\n})\n
Run Code Online (Sandbox Code Playgroud)\n