bli*_*ter 6 node.js promise backpressure es6-promise node-streams
我在 Nodejs 模式下使用优秀的 Papa Parse 库,将超过 100 万行的大型(500 MB)CSV 文件流式传输到缓慢的持久性 API,该 API 一次只能接受一个请求。持久性 API 基于Promise
s,但从 Papa Parse,我在同步事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }
我面临的挑战是 Papa Parse 从流中转储 CSV 行的速度太快,以至于我缓慢的持久性 API 无法跟上。因为 Papa 是同步的,而我的 API 是基于Promise的,所以我不能只调用事件处理await doDirtyWork(row)
程序on
,因为同步和异步代码不会混合。
或者它们可以混合,但我只是不知道如何混合?
我的问题是,我可以让 Papa 的事件处理程序等待我的 API 调用完成吗?是不是直接在on("data")
事件中执行持久性 API 请求,让on()
函数以某种方式徘徊直到脏 API 工作完成?
就内存占用而言,我到目前为止的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式对大量事件进行排队。on("data")
我还可以将 Promise 工厂排列在一个数组中,并在循环中进行处理。无论哪种方式,我最终都会将几乎整个 CSV 文件作为未来 Promise(承诺工厂)的巨大集合保存在内存中,直到我缓慢的 API 调用完全完成。
async importCSV(filePath) {
let parsedNum = 0, processedNum = 0;
async function* gen() {
let pf = yield;
do {
pf = yield await pf();
} while (typeof pf === "function");
};
var g = gen();
g.next();
await new Promise((resolve, reject) => {
try {
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
dataStream.pipe(parseStream);
parseStream.on("data", row => {
// Received a CSV row from Papa.parse()
try {
console.log("PA#", parsedNum, ": parsed", row.filter((e, i) => i <= 2 ? e : undefined)
);
parsedNum++;
// Simulate some really slow async/await dirty work here, for example
// send requests to a one-at-a-time persistence API
g.next(() => { // don't execute now, call in sequence via the generator above
return new Promise((res, rej) => {
console.log(
"DW#", processedNum, ": dirty work START",
row.filter((e, i) => i <= 2 ? e : undefined)
);
setTimeout(() => {
console.log(
"DW#", processedNum, ": dirty work STOP ",
row.filter((e, i) => i <= 2 ? e : undefined)
);
processedNum++;
res();
}, 1000)
})
});
} catch (err) {
console.log(err.stack);
reject(err);
}
});
parseStream.on("finish", () => {
console.log(`Parsed ${parsedNum} rows`);
resolve();
});
} catch (err) {
console.log(err.stack);
reject(err);
}
});
while(!(await g.next()).done);
}
Run Code Online (Sandbox Code Playgroud)
那么爸爸为什么要着急呢?为什么不允许我慢一点处理文件 - 原始 CSV 文件中的数据不会消失,我们有几个小时来完成流式传输,为什么要用我似乎on("data")
无法放慢速度的事件来折磨我?
所以我真正需要的是让爸爸变得更像爷爷,并最大限度地减少或消除 CSV 行的任何排队或缓冲。理想情况下,我能够将 Papa 的解析事件与我的 API 的速度(或缺乏速度)完全同步。因此,如果不是因为异步代码无法使同步代码“休眠”的教条,我理想情况下会将每个 CSV 行发送到Papa 事件内的API ,然后才进行控制权返回给 Papa。
建议?事件处理程序与我的异步 API 的缓慢性之间的某种“松散耦合”也很好。我不介意是否有几百行排队。但当数万个堆积起来时,我很快就会耗尽堆。
为什么要用我
on("data")
似乎无法放慢速度的事情来打击我?
你可以,你只是没有要求爸爸停下来。您可以通过调用 来做到这一点stream.pause()
,然后再stream.resume()
调用 Node 流的内置背压。
然而,有一个比在基于回调的代码中自己处理这个问题更好的 API 可以使用:使用流作为异步迭代器!当您await
处于循环体中时for await
,生成器也必须暂停。所以你可以写
async importCSV(filePath) {
let parsedNum = 0;
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
dataStream.pipe(parseStream);
for await (const row of parseStream) {
// Received a CSV row from Papa.parse()
const data = row.filter((e, i) => i <= 2 ? e : undefined);
console.log("PA#", parsedNum, ": parsed", data);
parsedNum++;
await dirtyWork(data);
}
console.log(`Parsed ${parsedNum} rows`);
}
importCSV('sample.csv').catch(console.error);
let processedNum = 0;
function dirtyWork(data) {
// Simulate some really slow async/await dirty work here,
// for example send requests to a one-at-a-time persistence API
return new Promise((res, rej) => {
console.log("DW#", processedNum, ": dirty work START", data)
setTimeout(() => {
console.log("DW#", processedNum, ": dirty work STOP ", data);
processedNum++;
res();
}, 1000);
});
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1612 次 |
最近记录: |