Meteor:如何将大文件流式传输并解析为异步节点功能?

Joh*_*len 6 asynchronous stream node.js meteor

我正在使用作业集合包来执行以下操作:

  1. 下载包含大量有关网页元数据的大文件
  2. 使用NPM event-stream包从正则表达式拆分的文件元数据创建流
  3. 检查集合中的元数据是否匹配(我一直在尝试将每个网页的元数据流式传输到另一个函数来执行此操作)

该文件太大而无法缓冲,因此需要流式传输.如果您想尝试这个,这是一个包含一些元数据示例的小文件.

job-collection包中的每个作业都已经在异步函数中:

var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');

function (job, callback) {

//This download is much too long to block
  request({url: job.fileURL, encoding: null}, function (error, response, body) {
    if (error) console.error('Error downloading File');
    if (response.statusCode !== 200) console.error(downloadResponse.statusCode, 'Status not 200');

    var responseEncoding = response.headers['content-type'];
    console.log('response encoding is %s', responseEncoding);
    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
      console.log('Received binary/octet-stream');
      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
              .pipe(EventStream.split(regexSplit))
              .pipe(EventStream.map(function (webpageMetaData) {
      /* Need parse the metaData or pass each webpageMetaData to function
       * This next function could block if it had to */
      searchPageMetaData(webpageMetaData); // pass each metadatum to this function to update a collection - this function can be synchronous
    }));
    } else {
      console.error('Wrong encoding');
    }
  });
}

function searchWebPageMetaData(metaData) {
  //  Parse JSON and search collection for match
}
Run Code Online (Sandbox Code Playgroud)
  • 有没有更好的方法来构建这个?我是在正确的轨道上吗?
  • 在哪里放Meteor.bindEnvironment?- 每次传递到II时都会绑定环境searchWebPageMetaData()吗?我需要在这里明确使用纤维吗?
  • 如果我运行它,流在运行时会停止process.stdout.我应该将流放入Meteor的一个包裹中
  • 我知道Meteor.wrapAsync.我想将最里面的searchWebPageMetaData()函数包装进去Meteor.wrapAsync吗?(当我输入时,想想我回答的是)
  • 流会慢慢来补偿数据库调用的缓慢吗?我的猜测不是,但我该怎么处理?

我花了很长时间学习Meteor的wrapAsync,bindEnvironment但却无法将它们整合在一起并了解在哪里使用它们.

补充1

只是为了澄清,步骤是:

  1. 下载文件;
  2. 创建流;
  3. 解压缩;
  4. 将其拆分为单独的webPages - EventStream处理此问题
  5. 将其发送给函数 - 不需要返回值; 这可能是阻塞,只是一些搜索和数据库调用

我试图做这样的事情,除了我需要帮助的核心代码是在一个不同文件的函数中.以下代码中包含@ electric-jesus的大部分答案.

   processJobs('parseWatFile', {
     concurrency: 1,
     cargo: 1,
     pollInterval: 1000,
     prefetch: 1
   }, function (job, callback) {

     if (job.data.watZipFileLink) {
       queue.pause();
       console.log('queue should be paused now');


       var watFileUrl = 'https://s3.amazonaws.com/ja-common-crawl/exampleWatFile.wat.gz';
       function searchPageMetaData(webpageMetaData, callback) {
         console.log(webpageMetaData);  // Would be nice to just get this function logging each webPageMetaData
         future.return(callback(webpageMetaData));  //I don't need this to return any value - do I have to return something?
     }

      if (!watFile)
        console.error('No watFile passed to downloadAndSearchWatFileForEntity ');

      var future = new Future(); // Doc Brown would be proud.

      if(typeof callback !== 'function') future.throw('callbacks are supposed to be functions.');

    request({url: watFile, encoding: null}, function (error, response, body) {

      if (error)                        future.throw('Error Downloading File');
      if (response.statusCode !== 200)  future.throw('Expected status 200, got ' + response.statusCode + '.');

      var responseEncoding = response.headers['content-type'];

    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {

      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
        .pipe(EventStream.split(regexSplit))
        .pipe(EventStream.map(function (webpageMetaData) {
        searchPageMetaData(webpageMetaData, callback);
      })
    ));
    } else {
      future.throw('Wrong encoding');
    }
    });

    return future.wait();

    } else {
      console.log('No watZipFileLink for this job');
      job.log('ERROR: NO watZipFileLink from commonCrawlJob collection');
    }
      queue.resume();
      job.done;
      callback();
  }
Run Code Online (Sandbox Code Playgroud)

Set*_*aki 3

有趣,看起来还可以。我从未使用过job-collection,但它似乎只是一个 Mongo 驱动的任务队列..所以我假设它像常规队列一样工作。我总是发现带有回调的东西,我肯定会使用该Future模式。例如:

var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');

var Future = Npm.require('fibers/future');


var searchWebPageMetaData = function (metaData) {
  //  Parse JSON and search collection for match
  // make it return something
  var result = /droids/ig.test(metaData);
  return result;
}

var processJob = function (job, callback) {

  var future = new Future(); // Doc Brown would be proud.

  if(typeof callback !== 'function') future.throw("Oops, you forgot that callbacks are supposed to be functions.. not undefined or whatever.");

  //This download is much too long to block
  request({url: job.fileURL, encoding: null}, function (error, response, body) {

    if (error)                        future.throw("Error Downloading File");
    if (response.statusCode !== 200)  future.throw("Expected status 200, got " + downloadResponse.statusCode + ".");

    var responseEncoding = response.headers['content-type'];


    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {

      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
              .pipe(EventStream.split(regexSplit))
              .pipe(EventStream.map(function (webpageMetaData) {
      /* Need parse the metaData or pass each webpageMetaData to function
       * This next function could block if it had to */

      // pass each metadatum to this function to update a collection - this function can be synchronous

      future.return(callback(webpageMetaData)); // this way, processJob returns whatever we find in the completed webpage, via callback.

    }));
    } else {
      future.throw('Wrong encoding');
    }
  });

  return future.wait();
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

所以每当你在这里分配变量时:

var currentJob = processJob(myjob, searchWebPageMetaData);
Run Code Online (Sandbox Code Playgroud)

即使使用同步类型获取/变量分配,您也可以及时完成并传输异步内容。

为了回答您的问题,

  • Meteor.bindEnvironment 放在哪里?- 每次传递给 searchWebPageMetaData() 时,我都会绑定环境吗?我需要在这里明确使用纤维吗?

    不是真的,我相信显式使用fibers/future已经解决了这个问题。

  • 如果我将它运行到 process.stdout,那么运行它时流会停止。我是否应该将流放入 Meteor 的其中一个包装中

    你是什​​么意思?我隐约记得 process.stdout 正在阻塞,这可能是一个原因。再次,将结果包装在 a 中future应该可以解决这个问题。

  • 我知道 Meteor.wrapAsync。我是否想将最里面的 searchWebPageMetaData() 函数包装在 Meteor.wrapAsync 中?(我想我在打字时回答的是)

    查看 Meteor.wrapAsync 帮助程序代码。它基本上是一个future应用的解决方案,当然你可以这样做,然后你也可以明确地fibers/future单独使用它,没有问题。

  • 流是否会减慢以补偿数据库调用的缓慢?我的猜测是不会,但我该如何处理呢?

    不太确定你在这里的意思..但由于我们正在尝试使用异步光纤,我的猜测也不是。我还没有看到使用光纤有任何缓慢的情况。可能只有当同时启动(并同时运行)多个作业时,您才会遇到内存使用方面的性能问题。保持并发队列较低,因为 Fibers 在同时运行东西方面非常强大。你只有一个核心来处理这一切,这是一个可悲的事实,因为节点不能多核:(