Joh*_*len 6 asynchronous stream node.js meteor
我正在使用作业集合包来执行以下操作:
event-stream包从正则表达式拆分的文件元数据创建流该文件太大而无法缓冲,因此需要流式传输.如果您想尝试这个,这是一个包含一些元数据示例的小文件.
job-collection包中的每个作业都已经在异步函数中:
var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');
function (job, callback) {
//This download is much too long to block
request({url: job.fileURL, encoding: null}, function (error, response, body) {
if (error) console.error('Error downloading File');
if (response.statusCode !== 200) console.error(downloadResponse.statusCode, 'Status not 200');
var responseEncoding = response.headers['content-type'];
console.log('response encoding is %s', responseEncoding);
if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
console.log('Received binary/octet-stream');
var regexSplit = /WARC\/1\./;
response.pipe(zlib.createGunzip()
.pipe(EventStream.split(regexSplit))
.pipe(EventStream.map(function (webpageMetaData) {
/* Need parse the metaData or pass each webpageMetaData to function
* This next function could block if it had to */
searchPageMetaData(webpageMetaData); // pass each metadatum to this function to update a collection - this function can be synchronous
}));
} else {
console.error('Wrong encoding');
}
});
}
function searchWebPageMetaData(metaData) {
// Parse JSON and search collection for match
}
Run Code Online (Sandbox Code Playgroud)
Meteor.bindEnvironment?- 每次传递到II时都会绑定环境searchWebPageMetaData()吗?我需要在这里明确使用纤维吗?process.stdout.我应该将流放入Meteor的一个包裹中Meteor.wrapAsync.我想将最里面的searchWebPageMetaData()函数包装进去Meteor.wrapAsync吗?(当我输入时,想想我回答的是)我花了很长时间学习Meteor的wrapAsync,bindEnvironment但却无法将它们整合在一起并了解在哪里使用它们.
补充1
只是为了澄清,步骤是:
我试图做这样的事情,除了我需要帮助的核心代码是在一个不同文件的函数中.以下代码中包含@ electric-jesus的大部分答案.
processJobs('parseWatFile', {
concurrency: 1,
cargo: 1,
pollInterval: 1000,
prefetch: 1
}, function (job, callback) {
if (job.data.watZipFileLink) {
queue.pause();
console.log('queue should be paused now');
var watFileUrl = 'https://s3.amazonaws.com/ja-common-crawl/exampleWatFile.wat.gz';
function searchPageMetaData(webpageMetaData, callback) {
console.log(webpageMetaData); // Would be nice to just get this function logging each webPageMetaData
future.return(callback(webpageMetaData)); //I don't need this to return any value - do I have to return something?
}
if (!watFile)
console.error('No watFile passed to downloadAndSearchWatFileForEntity ');
var future = new Future(); // Doc Brown would be proud.
if(typeof callback !== 'function') future.throw('callbacks are supposed to be functions.');
request({url: watFile, encoding: null}, function (error, response, body) {
if (error) future.throw('Error Downloading File');
if (response.statusCode !== 200) future.throw('Expected status 200, got ' + response.statusCode + '.');
var responseEncoding = response.headers['content-type'];
if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
var regexSplit = /WARC\/1\./;
response.pipe(zlib.createGunzip()
.pipe(EventStream.split(regexSplit))
.pipe(EventStream.map(function (webpageMetaData) {
searchPageMetaData(webpageMetaData, callback);
})
));
} else {
future.throw('Wrong encoding');
}
});
return future.wait();
} else {
console.log('No watZipFileLink for this job');
job.log('ERROR: NO watZipFileLink from commonCrawlJob collection');
}
queue.resume();
job.done;
callback();
}
Run Code Online (Sandbox Code Playgroud)
有趣,看起来还可以。我从未使用过job-collection,但它似乎只是一个 Mongo 驱动的任务队列..所以我假设它像常规队列一样工作。我总是发现带有回调的东西,我肯定会使用该Future模式。例如:
var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');
var Future = Npm.require('fibers/future');
var searchWebPageMetaData = function (metaData) {
// Parse JSON and search collection for match
// make it return something
var result = /droids/ig.test(metaData);
return result;
}
var processJob = function (job, callback) {
var future = new Future(); // Doc Brown would be proud.
if(typeof callback !== 'function') future.throw("Oops, you forgot that callbacks are supposed to be functions.. not undefined or whatever.");
//This download is much too long to block
request({url: job.fileURL, encoding: null}, function (error, response, body) {
if (error) future.throw("Error Downloading File");
if (response.statusCode !== 200) future.throw("Expected status 200, got " + downloadResponse.statusCode + ".");
var responseEncoding = response.headers['content-type'];
if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
var regexSplit = /WARC\/1\./;
response.pipe(zlib.createGunzip()
.pipe(EventStream.split(regexSplit))
.pipe(EventStream.map(function (webpageMetaData) {
/* Need parse the metaData or pass each webpageMetaData to function
* This next function could block if it had to */
// pass each metadatum to this function to update a collection - this function can be synchronous
future.return(callback(webpageMetaData)); // this way, processJob returns whatever we find in the completed webpage, via callback.
}));
} else {
future.throw('Wrong encoding');
}
});
return future.wait();
}
Run Code Online (Sandbox Code Playgroud)
所以每当你在这里分配变量时:
var currentJob = processJob(myjob, searchWebPageMetaData);
Run Code Online (Sandbox Code Playgroud)
即使使用同步类型获取/变量分配,您也可以及时完成并传输异步内容。
为了回答您的问题,
Meteor.bindEnvironment 放在哪里?- 每次传递给 searchWebPageMetaData() 时,我都会绑定环境吗?我需要在这里明确使用纤维吗?
不是真的,我相信显式使用fibers/future已经解决了这个问题。
如果我将它运行到 process.stdout,那么运行它时流会停止。我是否应该将流放入 Meteor 的其中一个包装中
你是什么意思?我隐约记得 process.stdout 正在阻塞,这可能是一个原因。再次,将结果包装在 a 中future应该可以解决这个问题。
我知道 Meteor.wrapAsync。我是否想将最里面的 searchWebPageMetaData() 函数包装在 Meteor.wrapAsync 中?(我想我在打字时回答的是)
查看 Meteor.wrapAsync 帮助程序代码。它基本上是一个future应用的解决方案,当然你可以这样做,然后你也可以明确地fibers/future单独使用它,没有问题。
流是否会减慢以补偿数据库调用的缓慢?我的猜测是不会,但我该如何处理呢?
不太确定你在这里的意思..但由于我们正在尝试使用异步光纤,我的猜测也不是。我还没有看到使用光纤有任何缓慢的情况。可能只有当同时启动(并同时运行)多个作业时,您才会遇到内存使用方面的性能问题。保持并发队列较低,因为 Fibers 在同时运行东西方面非常强大。你只有一个核心来处理这一切,这是一个可悲的事实,因为节点不能多核:(