连接两个(或n)个流

Jo *_*iss 17 stream node.js eventemitter

  • 2个流:

    鉴于可读 stream1stream2,什么是地道(简洁)的方式来获取包含流stream1stream2级联

    我不能这样做stream1.pipe(outStream); stream2.pipe(outStream),因为那时流内容混杂在一起.

  • n个流:

    给定一个发出不确定数量的流的EventEmitter,例如

    eventEmitter.emit('stream', stream1)
    eventEmitter.emit('stream', stream2)
    eventEmitter.emit('stream', stream3)
    ...
    eventEmitter.emit('end')
    
    Run Code Online (Sandbox Code Playgroud)

    什么是一种惯用(简洁)的方式来获得所有流连接在一起的流

duc*_*ale 17

现在可以使用异步迭代器轻松完成此操作

async function* concatStreams(readables) {
  for (const readable of readables) {
    for await (const chunk of readable) { yield chunk }
  }
} 
Run Code Online (Sandbox Code Playgroud)

你可以像这样使用它

const fs = require('fs')
const stream = require('stream')

const files = ['file1.txt', 'file2.txt', 'file3.txt'] 
const iterable = await concatStreams(files.map(f => fs.createReadStream(f)))

// convert the async iterable to a readable stream
const mergedStream = stream.Readable.from(iterable)
Run Code Online (Sandbox Code Playgroud)

有关异步迭代器的更多信息:https ://2ality.com/2019/11/nodejs-streams-async-iteration.html


Jo *_*iss 13

合并的流包会连接流.自述文件中的示例:

var CombinedStream = require('combined-stream');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));
Run Code Online (Sandbox Code Playgroud)

我相信你必须立刻附加所有流.如果队列为空,则combinedStream自动结束.见问题#5.

流流库是一个具有明确的替代.end,但它更受欢迎,想必没有得到很好的测试.它使用Node 0.10的streams2 API(参见本讨论).


Ivo*_*Ivo 10

nodejs中,一个简单的reduce操作应该没问题!

const {PassThrough} = require('stream')

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
  s.pipe(pt, {end: false})
  s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
  return pt
}, new PassThrough())
Run Code Online (Sandbox Code Playgroud)

干杯;)

  • 警告:这将导致所有流并行传输到PassThrough流,而不考虑数据的排序,更可能破坏您的数据. (6认同)
  • @Ivo这个问题询问*连接*.因此,大多数参与本质量保证的读者都会关心订购.这个答案默默地误导了那些读者,因为流成功通过,但除非你检查输出,否则你永远不会知道它也混淆了你的所有数据(这个问题特别要求首先避免!).我恳请您将此信息添加到答案正文中. (3认同)
  • @LeonLi确实是此方法的目的。如果要保留订单,可以将与PassThrough不同的初始值传递给reduce函数;) (2认同)
  • 没有像 `stream.ended` 这样的东西。您必须在结束事件处理程序中设置 `s.ended = true`。 (2认同)

Fen*_*eng 9

这可以用vanilla nodejs来完成

import { PassThrough } from 'stream'
const merge = (...streams) => {
    let pass = new PassThrough()
    let waiting = streams.length
    for (let stream of streams) {
        pass = stream.pipe(pass, {end: false})
        stream.once('end', () => --waiting === 0 && pass.emit('end'))
    }
    return pass
}
Run Code Online (Sandbox Code Playgroud)

  • 如果一个流永远不会结束,但另一个流却结束了怎么办 (2认同)
  • 这个解决方案很好,但由于某些奇怪的原因没有保持我使用中的流的顺序。调用“merge(a, b)”会生成一个流,其中“b”位于“a”之前。这是否与“b”是一个比“a”少得多的项目流并且首先结束这一事实有关? (2认同)

GTP*_*TPV 6

在 vanilla nodejs 中使用 ECMA 15+ 并结合IvoFeng的良好答案。

该类PassThrough是一个普通的Transform流,不会以任何方式修改该流。

const { PassThrough } = require('stream');

const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
  .reduce((mergedStream, stream) => {
    // pipe each stream of the array into the merged stream
    // prevent the automated 'end' event from firing
    mergedStream = stream.pipe(mergedStream, { end: false });
    // rewrite the 'end' event handler
    // Every time one of the stream ends, the counter is decremented.
    // Once the counter reaches 0, the mergedstream can emit its 'end' event.
    stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
    return mergedStream;
  }, new PassThrough());
Run Code Online (Sandbox Code Playgroud)

可以这样使用:

const mergedStreams = concatStreams([stream1, stream2, stream3]);
Run Code Online (Sandbox Code Playgroud)