使用node.js流处理错误

B T*_*B T 149 stream node.js

处理流错误的正确方法是什么?我已经知道你可以听到一个'错误'事件,但我想知道一些关于任意复杂情况的更多细节.

对于初学者,当你想做一个简单的管道链时你会怎么做:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

您如何正确创建其中一个转换,以便正确处理错误?

更多相关问题:

  • 当错误发生时,'结束'事件会发生什么?它永远不会被解雇吗?它有时会被解雇吗?它取决于转换/流吗?这里的标准是什么?
  • 有什么机制可以通过管道传播错误吗?
  • 域名有效地解决了这个问题吗?例子很好.
  • 从"错误"事件中产生的错误是否有堆栈跟踪?有时?决不?有没有办法从他们那里得到一个?

msh*_*ren 205

转变

变换流既可读又可写,因此是非常好的"中间"流.出于这个原因,它们有时被称为through流.它们以这种方式类似于双工蒸汽,除了它们提供了一个很好的界面来操纵数据而不是仅通过它来发送数据.变换流的目的是在数据通过流传输时操纵数据.例如,您可能想要执行一些异步调用,或者派生几个字段,重新映射一些内容等.


您可以在哪里放置转换流


有关如何创建变换流的信息,请参见此处此处.你所要做的就是:

  1. 包括流模块
  2. 实例化(或继承)Transform类
  3. 实现一个_transform需要的方法(chunk, encoding, callback).

块是你的数据.如果你在工作,大多数时候你不需要担心编码objectMode = true.处理完块后调用回调.然后将该块推送到下一个流.

如果你想要一个很好的帮助模块,让你真的很容易通过流,我建议通过2.

对于错误处理,请继续阅读.

在管道链中,处理错误确实非常重要.根据这个线程 .pipe()不是为了转发错误而构建的.所以像......

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});
Run Code Online (Sandbox Code Playgroud)

...只会监听流上的错误c.如果发出错误事件,则a不会传递该事件,实际上会抛出.要正确执行此操作:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});
Run Code Online (Sandbox Code Playgroud)

现在,虽然第二种方式更详细,但您至少可以保留错误发生位置的上下文.这通常是件好事.

我觉得有一个图书馆很有帮助,但如果你有一个案例,你只想在目的地捕获错误而你不关心它发生的地方就是事件流.

结束

触发错误事件时,不会(显式)触发结束事件.发出错误事件将结束流.

根据我的经验,域名在大多数情况下都能很好地运作.如果您有未处理的错误事件(即在没有侦听器的流上发出错误),服务器可能会崩溃.现在,正如上面的文章所指出的,您可以将流包装在一个应该正确捕获所有错误的域中.

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });
Run Code Online (Sandbox Code Playgroud)

域的美妙之处在于它们将保留堆栈跟踪.虽然事件流也很好.

如需更多阅读,请查看流手册.相当深入,但超级有用,并提供了一些很好的链接到许多有用的模块.

  • 请注意,您不需要在匿名函数中包装`.on('error')`处理程序,即`a.on('error',function(e){handleError(e)})``只能是`a .on('error',handleError)` (11认同)
  • [域名已被弃用](https://github.com/nodejs/node/issues/10843) (7认同)
  • “domain”模块正在被*弃用*:https://nodejs.org/api/domain.html (5认同)

Ben*_*dan 23

域名已弃用.你不需要它们.

对于这个问题,变换或可写之间的区别并不那么重要.

mshell_lauren的答案很棒,但作为替代方案,您还可以在您认为可能出错的每个流上显式侦听错误事件.如果您愿意,可以重用处理函数.

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }
Run Code Online (Sandbox Code Playgroud)

这样做可以防止臭名昭着的未捕获异常,如果其中一个流触发其错误事件

  • 错误事件允许用户实现处理程序 (5认同)
  • 大声笑有乐趣地处理3个不同的错误事件,并祈祷写3个不同的流库的任何人正确实现了错误处理 (3认同)
  • @Alex Mills 1)处理3个事件的问题是什么,为什么它们"不同",当它们的类型相同时 - "错误",人们可能会因为每个事件都是不同的事实而解决; 2)除了本机Node.js功能之外,上面写的是哪些流式库?3)为什么它们如何在内部处理事件,这显然允许任何人在已经存在的任何东西上附加额外的错误处理程序? (3认同)

shu*_*son 20

如果使用的节点> = v10.0.0,则可以使用stream.pipelinestream.finished

例如:

const { pipeline, finished } = require('stream');

pipeline(
  input, 
  transformA, 
  transformB, 
  transformC, 
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});


finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});
Run Code Online (Sandbox Code Playgroud)

有关更多讨论,请参见此github PR

  • 您可能希望在管道和各个流之间以不同的方式处理错误。 (3认同)
  • 当“pipeline”已经有回调时,为什么要使用“finished”? (2认同)

小智 10

整个链中的错误可以使用一个简单的函数传播到最右边的流:

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}
Run Code Online (Sandbox Code Playgroud)

可以使用如下:

safePipe(readable, [ transform1, transform2, ... ]);
Run Code Online (Sandbox Code Playgroud)


Vik*_*tam 5

.on("error", handler)只处理 Stream 错误,但如果您使用自定义 Transform 流,.on("error", handler)请不要捕获_transform函数内部发生的错误。所以可以做这样的事情来控制应用程序流:-

this_transform函数中的关键字指的是Stream它自己,它是一个EventEmitter. 因此,您可以使用try catch下面的方法来捕获错误,然后将它们传递给自定义事件处理程序。

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)
Run Code Online (Sandbox Code Playgroud)

这样,您可以将逻辑和错误处理程序分开。此外,您可以选择仅处理某些错误而忽略其他错误。

更新
替代方案:RXJS Observable


小智 5

使用multipipe包将多个流组合成一个双工流。并在一处处理错误。

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 


// centralized error handling
stream.on('error', fn)
Run Code Online (Sandbox Code Playgroud)