在Node.js中解析巨大的日志文件 - 逐行读取

vic*_*ooi 104 parsing logfile-analysis node.js

我需要在Javascript/Node.js中解析大型(5-10 Gb)日志文件(我正在使用Cube).

日志看起来像:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".
Run Code Online (Sandbox Code Playgroud)

我们需要读取每一行,进行一些解析(例如剥离5,7然后SUCCESS),然后使用他们的JS客户端将这些数据泵入Cube(https://github.com/square/cube).

首先,Node中逐行读取文件的规范方式是什么?

这似乎是在线相当常见的问题:

许多答案似乎指向一堆第三方模块:

但是,这似乎是一项相当基本的任务 - 当然,在stdlib中有一种简单的方法可以逐行读取文本文件吗?

其次,我需要处理每一行(例如,将时间戳转换为Date对象,并提取有用的字段).

什么是最好的方法,最大化吞吐量?是否有某种方法不会阻止每行读取或将其发送到Cube?

第三 - 我猜测使用字符串拆分,并且JS等价的contains(IndexOf!= -1?)将比正则表达式快得多?有没有人在Node.js中解析大量文本数据方面有很多经验?

干杯,维克多

Ger*_*ard 183

我搜索了一个解决方案,使用流逐行解析非常大的文件(gbs).所有第三方库和示例都不符合我的需要,因为他们不是逐行处理文件(如1,2,3,4 ..)或将整个文件读取到内存中

以下解决方案可以使用流和管道逐行解析非常大的文件.为了测试,我使用了一个带有17.000.000记录的2.1 GB文件.Ram的使用量不超过60 mb.

var fs = require('fs')
    , es = require('event-stream');

var lineNr = 0;

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        lineNr += 1;

        // process line here and call s.resume() when rdy
        // function below was for logging memory usage
        logMemoryUsage(lineNr);

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(err){
        console.log('Error while reading file.', err);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

请让我知道它是怎么回事!

  • 仅供参考,此代码不同步.这是异步的.如果在代码的最后一行之后插入`console.log(lineNr)`,它将不会显示最终行数,因为文件是异步读取的. (4认同)
  • 谢谢,这是我能找到的唯一一个实际上暂停并恢复的解决方案.Readline没有. (3认同)
  • event-stream 被泄露: https://medium.com/intrinsic/compromished-npm-package-event-stream-d47d08605502 但 4+ 显然是安全的 https://blog.npmjs.org/post/180565383195/details-about -事件流事件 (3认同)
  • 很棒的例子,它确实暂停了.另外,如果您决定提前停止文件读取,可以使用`s.end();` (2认同)
  • 像魅力一样工作。用它来索引 1.5 亿个文档到 elasticsearch 索引。`readline` 模块很痛苦。它不会暂停并且每次在 40-5000 万之后都会导致失败。浪费了一天。非常感谢您的回答。这个完美地工作 (2认同)

use*_*109 64

您可以使用内置readline包,请参阅此处的文档.我使用stream来创建一个新的输出流.

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: instream,
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    console.log(line);
    //Do your stuff ...
    //Then write to outstream
    rl.write(cubestuff);
});
Run Code Online (Sandbox Code Playgroud)

大文件需要一些时间来处理.告诉它是否有效.

  • 如所写,倒数第二行失败,因为未定义 cubestuff。 (2认同)
  • 使用`readline`,是否可以暂停/恢复读取流以在"do stuff"区域中执行异步操作? (2认同)
  • 当我尝试暂停/恢复时,@jchook `readline` 给我带来了很多问题。如果下游过程较慢,它不会正确暂停流,从而产生很多问题 (2认同)

amb*_*odi 27

我真的很喜欢@gerard的答案,这实际上应该是正确答案.我做了一些改进:

  • 代码属于一个类(模块化)
  • 解析包括在内
  • 如果异步作业被链接到读取CSV(如插入到数据库)或HTTP请求,则会向外部提供恢复能力
  • 读取用户可以声明的块/批量大小.我也在流中处理编码,以防你有不同编码的文件.

这是代码:

'use strict'

const fs = require('fs'),
    util = require('util'),
    stream = require('stream'),
    es = require('event-stream'),
    parse = require("csv-parse"),
    iconv = require('iconv-lite');

class CSVReader {
  constructor(filename, batchSize, columns) {
    this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
    this.batchSize = batchSize || 1000
    this.lineNumber = 0
    this.data = []
    this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
  }

  read(callback) {
    this.reader
      .pipe(es.split())
      .pipe(es.mapSync(line => {
        ++this.lineNumber

        parse(line, this.parseOptions, (err, d) => {
          this.data.push(d[0])
        })

        if (this.lineNumber % this.batchSize === 0) {
          callback(this.data)
        }
      })
      .on('error', function(){
          console.log('Error while reading file.')
      })
      .on('end', function(){
          console.log('Read entirefile.')
      }))
  }

  continue () {
    this.data = []
    this.reader.resume()
  }
}

module.exports = CSVReader
Run Code Online (Sandbox Code Playgroud)

所以基本上,这是你将如何使用它:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())
Run Code Online (Sandbox Code Playgroud)

我用35GB的CSV文件对它进行了测试,这对我有用,这就是为什么我选择在@gerard的答案上构建它 ,欢迎反馈.


Jai*_*mez 12

Node.js 文档提供了一个使用 Readline 模块的非常优雅的示例。

示例:逐行读取文件流

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
    input: fs.createReadStream('sample.txt'),
    crlfDelay: Infinity
});

rl.on('line', (line) => {
    console.log(`Line from file: ${line}`);
});
Run Code Online (Sandbox Code Playgroud)

注意:我们使用 crlfDelay 选项将 CR LF ('\r\n') 的所有实例识别为单个换行符。


小智 11

我使用https://www.npmjs.com/package/line-by-line从文本文件中读取超过1 000 000行.在这种情况下,RAM的占用容量约为50-60兆字节.

    const LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

    lr.on('error', function (err) {
         // 'err' contains error object
    });

    lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();

        // ...do your asynchronous line processing..
        setTimeout(function () {
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
    });

    lr.on('end', function () {
         // All lines are read, file is closed now.
    });
Run Code Online (Sandbox Code Playgroud)


Sri*_*tha 6

使用原生 Nodejs 模块(fs、readline)的流读取/写入文件:

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
                                       input:  fs.createReadStream('input.json'),
                                       output: fs.createWriteStream('output.json')
                                    });

rl.on('line', function(line) {
    console.log(line);

    // Do any 'line' processing if you want and then write to the output file
    this.output.write(`${line}\n`);
});

rl.on('close', function() {
    console.log(`Created "${this.output.path}"`);
});
Run Code Online (Sandbox Code Playgroud)


Kri*_*ofe 5

除了逐行读取大文件之外,您还可以逐块读取它。有关更多信息,请参阅本文

var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
    offset += bytesRead;
    var str = chunkBuffer.slice(0, bytesRead).toString();
    var arr = str.split('\n');

    if(bytesRead = chunkSize) {
        // the last item of the arr may be not a full line, leave it to the next chunk
        offset -= arr.pop().length;
    }
    lines.push(arr);
}
console.log(lines);
Run Code Online (Sandbox Code Playgroud)

  • 难道下面应该是比较而不是赋值:“if(bytesRead = chunkSize)”? (3认同)