swb*_*dit 19 javascript database asynchronous node.js express
环境: NodeJS,Express,DynamoDB(但真的可以是任何数据库)
场景: 需要读取大量记录并作为可下载文件返回给用户.这意味着我不能一次缓冲所有内容,然后在Express的响应中发送它.此外,我可能需要多次执行查询,因为在一个查询中可能不会返回所有数据.
建议的解决方案: 使用可以通过管道传输到Express中的响应流的可读流.
我首先创建了一个继承自stream.Readable的对象,并实现了一个推送查询结果的_read()方法.问题是_read()中调用的数据库查询是异步的,但stream.read()是同步方法.
当流通过管道传递给服务器的响应时,在db查询甚至有机会执行之前,会多次调用读取.因此,查询被多次调用,即使查询的第一个实例完成并执行push(null),其他查询也会完成,并且在EOF之后出现"push()"错误.
谢谢
function DynamoDbResultStream(query, options){
if(!(this instanceof DynamoDbResultStream)){
return new DynamoDbResultStream(query, options);
}
Readable.call(this, options);
this.dbQuery = query;
this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);
DynamoDbResultStream.prototype._read = function(){
var self = this;
if(!this.done){
dynamoDB.query(this.dbQuery, function(err, data) {
if (!err) {
try{
for(i=0;i<data.Items.length;i++){
self.push(data.Items[i]);
}
}catch(err){
console.log(err);
}
if (data.LastEvaluatedKey) {
//Next read() should invoke the query with a new start key
self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
}else{
self.done=true;
self.push(null);
}
}else{
console.log(err);
self.emit('error',err);
}
});
}else{
self.push(null);
}
};
Run Code Online (Sandbox Code Playgroud)
编辑: 发布此问题后,我发现这篇文章的答案显示如何在不使用继承的情况下执行此操作:如何在node.js可读流内调用异步函数
在那里发表评论说,在_read()中应该只有一个push().每个push()通常会生成另一个read()调用.
小智 1
注意 Stream 的不同模式:https ://nodejs.org/api/stream.html#stream_two_modes
\n\nconst Readable = require(\'stream\').Readable;\n\n// starts in paused mode\nconst readable = new Readable();\n\nlet i = 0;\nfetchMyAsyncData() {\n setTimeout(() => {\n // still remains in paused mode\n readable.push(++i);\n\n if (i === 5) {\n return readable.emit(\'end\');\n }\n\n fetchMyAsyncData();\n }, 500); \n}\n\n// "The res object is an enhanced version of Node\xe2\x80\x99s own response object and supports all built-in fields and methods."\napp.get(\'/mystreamingresponse\', (req, res) => {\n\n // remains in paused mode\n readable.on(\'readable\', () => res.write(readable.read()));\n\n fetchMyAsyncData();\n\n // closes the response stream once all external data arrived\n readable.on(\'end\', () => res.end());\n})\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
1876 次 |
| 最近记录: |