内存高效增长的 Nodejs Duplex/Transform 流

Sof*_*son 5 node.js

我正在尝试通过流将变量添加到特定索引处的模板中。

图像

我的想法是,我有一个可读流和一个变量列表,这些变量可以是可读流、缓冲区或不确定大小的字符串。这些变量可以插入到预定义的索引列表中。根据我的假设和迄今为止我所做的尝试,我有几个问题。

我的第一次尝试是使用可读流手动完成此操作。但是,在尝试读取它之前我无法const buffer = templateIn.read(size)(因为缓冲区仍然是空的) 。template combined该问题的解决方案与使用转换流的方式类似,因此这是我采取的下一步。

但是,我在转换流方面遇到问题。我的问题是,像这样的伪代码会将缓冲区堆积到内存中,直到done()被调用。

public _transform(chunk: Buffer, encoding: string, done: (err?: Error, data?: any) => void ): void {
    let index = 0;
    while (index < chunk.length) {
        if (index === this.variableIndex) {  // the basic idea (the actual logic is a bit more complex)
            this.insertStreamHere(index);
            index++;
        } else {
            // continue reading stream normally
        }
    }
    done()
}
Run Code Online (Sandbox Code Playgroud)

来自: https: //github.com/nodejs/node/blob/master/lib/_stream_transform.js

在转换流中,写入的数据被放置在缓冲区中。当调用 _read(n) 时,它会转换排队的数据,在消耗块时调用缓冲的 _write cb。如果消耗单个写入块将导致多个输出块,则第一个输出位调用 readcb,后续块将进入读取缓冲区,并在必要时使其发出“可读”信号。

这样,背压实际上是由读取端决定的,因为必须调用 _read 来开始处理新块。然而,病态膨胀类型的变换可能会导致此处过度缓冲。例如,想象一个流,其中输入的每个字节都被解释为 0-255 之间的整数,然后产生那么多字节的输出。写入 4 个字节 {ff,ff,ff,ff} 将导致输出 1kb 的数据。在这种情况下,您可以编写非常少量的输入,但最终会得到大量的输出。在这种病态的膨胀机制中,没有办法告诉系统停止进行转换。单次 4MB 写入可能会导致系统内存不足。

所以TL;DR:如何在特定索引处插入(大)流,而不会对内存中的缓冲区产生巨大的反压。任何建议表示赞赏。

Sof*_*son 5

经过大量阅读文档和源代码、大量试验、错误和一些测试。我已经为我的问题想出了一个解决方案。我可以复制并粘贴我的解决方案,但为了完整起见,我将在这里解释我的发现。

\n

用管道处理背压由几个部分组成。我们已经将Readable数据写入Writable. 提供Readable了一个回调,Writable可以告诉它Readable已准备好接收新的数据块。阅读部分比较简单。有Readable一个内部缓冲区。使用Readable.push()会将数据添加到缓冲区。当数据被读取时,它将来自这个内部缓冲区。接下来,我们可以使用Readable.readableHighWaterMarkReadable.readableLength来确保我们不会一次推送太多数据。

\n
Readable.readableHighWaterMark - Readable.readableLength\n
Run Code Online (Sandbox Code Playgroud)\n

是我们应该推送到此内部缓冲区的最大字节数。

\n

Readable所以这意味着,由于我们想同时从两个流中读取数据,因此我们需要两个Writable流来控制流量。要合并数据,我们需要自己缓冲它,因为(据我所知)没有内部缓冲区Writable。所以双工流将是最好的选择,因为我们想要自己处理缓冲、写入和读取。

\n

写作

\n

现在让我们来看代码。为了控制多个流的状态,我们将创建一个状态接口。如下所示:

\n
declare type StreamCallback = (error?: Error | null) => void;\n\ninterface MergingState {\n    callback: StreamCallback;\n    queue: BufferList;\n    highWaterMark: number;\n    size: number;\n    finalizing: boolean;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

保存callback由 write 或 Final 提供的最后一个回调(我们稍后会讨论 Final)。highWaterMark表示我们的最大大小queue,并且size是我们当前队列的大小。最后是finalizing标志表明当前队列是最后一个队列。因此,一旦队列为空,我们就完成了读取属于该状态的流。

\n

BufferList是Nodejs 内部实现的副本是用于构建流的

\n

如前所述,可写对象处理背压,因此我们两个可写对象的通用方法如下所示:

\n
/**\n * Method to write to provided state if it can\n *\n * (Will unshift the bytes that cannot be written back to the source)\n *\n * @param src the readable source that writes the chunk\n * @param chunk the chunk to be written\n * @param encoding the chunk encoding, currently not used\n * @param cb the streamCallback provided by the writing state\n * @param state the state which should be written to\n */\nprivate writeState(src: Readable, chunk: Buffer, encoding: string, cb: StreamCallback, state: MergingState): void {\n    this.mergeNextTick();\n    const bytesAvailable = state.highWaterMark - state.size;\n    if (chunk.length <= bytesAvailable) {\n        // save to write to our local buffer\n        state.queue.push(chunk);\n        state.size += chunk.length;\n        if (chunk.length === bytesAvailable) {\n            // our queue is full, so store our callback\n            this.stateCallbackAndSet(state, cb);\n        } else {\n            // we still have some space, so we can call the callback immediately\n            cb();\n        }\n        return;\n    }\n\n    if (bytesAvailable === 0) {\n        // no space available unshift entire chunk\n        src.unshift(chunk);\n    } else {\n        state.size += bytesAvailable;\n        const leftOver = Buffer.alloc(chunk.length - bytesAvailable);\n        chunk.copy(leftOver, 0, bytesAvailable);\n        // push amount of bytes available\n        state.queue.push(chunk.slice(0, bytesAvailable));\n        // unshift what we cannot fit in our queue\n        src.unshift(leftOver);\n    }\n    this.stateCallbackAndSet(state, cb);\n}\n
Run Code Online (Sandbox Code Playgroud)\n

首先我们检查有多少空间可用于缓冲。如果有足够的空间容纳我们的整个块,我们将对其进行缓冲。如果没有可用空间,我们会将缓冲区恢复到其可读源。如果有可用空间,我们只会取消移动无法容纳的部分。如果缓冲区已满,我们将存储请求新块的回调。如果有空间,我们将请求下一个块。

\n

this.mergeNextTick()被调用是因为我们的状态已经改变并且应该在下一个tick中读取它:

\n
private mergeNextTick(): void {\n    if (!this.mergeSync) {\n        // make sure it is only called once per tick\n        // we don\'t want to call it multiple times\n        // since there will be nothing left to read the second time\n        this.mergeSync = true;\n        process.nextTick(() => this._read(this.readableHighWaterMark));\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

this.stateCallbackAndSet是一个辅助函数,它只会调用我们的最后一个回调,以确保我们不会进入使流停止流动的状态。并将提供新的。

\n
/**\n * Helper function to call the callback if it exists and set the new callback\n * @param state the state which holds the callback\n * @param cb the new callback to be set\n */\nprivate stateCallbackAndSet(state: MergingState, cb: StreamCallback): void {\n    if (!state) {\n        return;\n    }\n    if (state.callback) {\n        const callback = state.callback;\n        // do callback next tick, such that we can\'t get stuck in a writing loop\n        process.nextTick(() => callback());\n    }\n    state.callback = cb;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

阅读

\n

现在到阅读方面,这是我们处理选择正确流的部分。

\n

首先我们的函数是读取状态,这非常简单。它读取它能够读取的字节数。它返回写入的字节数,这对于我们的其他函数来说是有用的信息。

\n
/**\n * Method to read the provided state if it can\n *\n * @param size the number of bytes to consume\n * @param state the state from which needs to be read\n * @returns the amount of bytes read\n */\nprivate readState(size: number, state: MergingState): number {\n    if (state.size === 0) {\n        // our queue is empty so we read 0 bytes\n        return 0;\n    }\n    let buffer = null;\n    if (state.size < size) {\n        buffer = state.queue.consume(state.size, false);\n    } else {\n        buffer = state.queue.consume(size, false);\n    }\n    this.push(buffer);\n    this.stateCallbackAndSet(state, null);\n    state.size -= buffer.length;\n    return buffer.length;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

doRead方法是所有合并发生的地方:它获取 nextMergingIndex。如果合并索引是,END那么我们可以读取writingState直到流的末尾。如果我们位于合并索引处,我们会从mergingState. 否则我们会从writingState直到到达下一个合并索引。

\n
/**\n * Method to read from the correct Queue\n *\n * The doRead method is called multiple times by the _read method until\n * it is satisfied with the returned size, or until no more bytes can be read\n *\n * @param n the number of bytes that can be read until highWaterMark is hit\n * @throws Errors when something goes wrong, so wrap this method in a try catch.\n * @returns the number of bytes read from either buffer\n */\nprivate doRead(n: number): number {\n    // first check all constants below 0,\n    // which is only Merge.END right now\n    const nextMergingIndex = this.getNextMergingIndex();\n    if (nextMergingIndex === Merge.END) {\n        // read writing state until the end\n        return this.readWritingState(n);\n    }\n    const bytesToNextIndex = nextMergingIndex - this.index;\n    if (bytesToNextIndex === 0) {\n        // We are at the merging index, thus should read merging queue\n        return this.readState(n, this.mergingState);\n    }\n    if (n <= bytesToNextIndex) {\n        // We are safe to read n bytes\n        return this.readWritingState(n);\n    }\n    // read the bytes until the next merging index\n    return this.readWritingState(bytesToNextIndex);\n}\n
Run Code Online (Sandbox Code Playgroud)\n

readWritingState读取状态并更新索引:

\n
/**\n * Method to read from the writing state\n *\n * @param n maximum number of bytes to be read\n * @returns number of bytes written.\n */\nprivate readWritingState(n: number): number {\n    const bytesWritten = this.readState(n, this.writingState);\n    this.index += bytesWritten;\n    return bytesWritten;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

合并

\n

为了选择要合并的流,我们将使用生成器函数。生成器函数生成一个索引和一个要在该索引处合并的流:

\n
export interface MergingStream { index: number; stream: Readable; }\n
Run Code Online (Sandbox Code Playgroud)\n

doRead getNextMergingIndex()被称为。该函数返回下一个的索引MergingStream。如果没有下一个 mergingStream,则调用生成器来获取新的 mergingStream。如果没有新的合并流,我们就返回END

\n
/**\n * Method to get the next merging index.\n *\n * Also fetches the next merging stream if merging stream is null\n *\n * @returns the next merging index, or Merge.END if there is no new mergingStream\n * @throws Error when invalid MergingStream is returned by streamGenerator\n */\nprivate getNextMergingIndex(): number {\n    if (!this.mergingStream) {\n        this.setNewMergeStream(this.streamGenerator.next().value);\n        if (!this.mergingStream) {\n            return Merge.END;\n        }\n    }\n    return this.mergingStream.index;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

在中,setNewMergeStream我们正在创建一个新的Writable,我们可以将新的合并流通过管道传输到其中。为了我们的Writable我们需要处理用于写入状态的写入回调和处理最后一个块的最终回调。我们也不应该忘记重置我们的状态。

\n
/**\n * Method to set the new merging stream\n *\n * @throws Error when mergingStream has an index less than the current index\n */\nprivate setNewMergeStream(mergingStream?: MergingStream): void {\n    if (this.mergingStream) {\n        throw new Error(\'There already is a merging stream\');\n    }\n    // Set a new merging stream\n    this.mergingStream = mergingStream;\n    if (mergingStream == null || mergingStream.index === Merge.END) {\n        // set new state\n        this.mergingState = newMergingState(this.writableHighWaterMark);\n        // We\'re done, for now...\n        // mergingStream will be handled further once nextMainStream() is called\n        return;\n    }\n    if (mergingStream.index < this.index) {\n        throw new Error(\'Cannot merge at \' + mergingStream.index + \' because current index is \' + this.index);\n    }\n    // Create a new writable our new mergingStream can write to\n    this.mergeWriteStream = new Writable({\n        // Create a write callback for our new mergingStream\n        write: (chunk, encoding, cb) => this.writeMerge(mergingStream.stream, chunk, encoding, cb),\n        final: (cb: StreamCallback) => {\n            this.onMergeEnd(mergingStream.stream, cb);\n        },\n    });\n    // Create a new mergingState for our new merging stream\n    this.mergingState = newMergingState(this.mergeWriteStream.writableHighWaterMark);\n    // Pipe our new merging stream to our sink\n    mergingStream.stream.pipe(this.mergeWriteStream);\n}  \n
Run Code Online (Sandbox Code Playgroud)\n

敲定

\n

该过程的最后一步是处理我们的最终块。这样我们就知道何时结束合并并可以发送结束块。在我们的主读取循环中,我们首先读取,直到我们的doRead()方法0连续返回两次,或者填满我们的读取缓冲区。一旦发生这种情况,我们就会结束读取循环并检查状态以查看它们是否已完成。

\n
public _read(size: number): void {\n        if (this.finished) {\n            // we\'ve finished, there is nothing to left to read\n            return;\n        }\n        this.mergeSync = false;\n        let bytesRead = 0;\n        do {\n            const availableSpace = this.readableHighWaterMark - this.readableLength;\n            bytesRead = 0;\n            READ_LOOP: while (bytesRead < availableSpace && !this.finished) {\n                try {\n                    const result = this.doRead(availableSpace - bytesRead);\n                    if (result === 0) {\n                        // either there is nothing in our buffers\n                        // or our states are outdated (since they get updated in doRead)\n                        break READ_LOOP;\n                    }\n                    bytesRead += result;\n                } catch (error) {\n                    this.emit(\'error\', error);\n                    this.push(null);\n                    this.finished = true;\n                }\n            }\n        } while (bytesRead > 0 && !this.finished);\n        this.handleFinished();\n    }\n
Run Code Online (Sandbox Code Playgroud)\n

然后在我们handleFinished()检查我们的状态。

\n
private handleFinished(): void {\n    if (this.finished) {\n        // merge stream has finished, so nothing to check\n        return;\n    }\n    if (this.isStateFinished(this.mergingState)) {\n        this.stateCallbackAndSet(this.mergingState, null);\n        // set our mergingStream to null, to indicate we need a new one\n        // which will be fetched by getNextMergingIndex()\n        this.mergingStream = null;\n        this.mergeNextTick();\n    }\n    if (this.isStateFinished(this.writingState)) {\n        this.stateCallbackAndSet(this.writingState, null);\n        this.handleMainFinish(); // checks if there are still mergingStreams left, and sets finished flag\n        this.mergeNextTick();\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

检查isStateFinished()我们的状态是否设置了最终标志以及队列大小是否等于 0

\n
/**\n * Method to check if a specific state has completed\n * @param state the state to check\n * @returns true if the state has completed\n */\nprivate isStateFinished(state: MergingState): boolean {\n    if (!state || !state.finalizing || state.size > 0) {\n        return false;\n    }\n    return true;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

一旦我们的结束回调处于合并Writable流的最终回调中,就会设置最终标志。对于我们的主流,我们必须采取稍微不同的方法,因为我们几乎无法控制流何时结束,因为默认情况下可读调用可写的结束。我们希望消除这种行为,以便我们可以决定何时完成直播。当设置其他最终侦听器时,这可能会导致一些问题,但对于大多数用例来说,这应该没问题。

\n
 private onPipe(readable: Readable): void {\n    // prevent our stream from being closed prematurely and unpipe it instead\n    readable.removeAllListeners(\'end\');  // Note: will cause issues if another end listener is set\n    readable.once(\'end\', () => {\n        this.finalizeState(this.writingState);\n        readable.unpipe();\n    });\n}\n
Run Code Online (Sandbox Code Playgroud)\n

设置finalizeState()标志和回调以结束流。

\n
/**\n * Method to put a state in finalizing mode\n *\n * Finalizing mode: the last chunk has been received, when size is 0\n * the stream should be removed.\n *\n * @param state the state which should be put in finalizing mode\n *\n */\nprivate finalizeState(state: MergingState, cb?: StreamCallback): void {\n    state.finalizing = true;\n    this.stateCallbackAndSet(state, cb);\n    this.mergeNextTick();\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这就是将多个流合并到一个接收器中的方法。

\n

TL;DR:完整代码

\n

该代码已使用我的笑话测试套件在多个边缘情况下进行了全面测试,并且具有比我的代码中解释的更多功能。例如附加流并合并到附加流中。通过提供Merge.END作为索引。

\n

测试结果

\n

您可以在这里查看我运行的测试,如果我忘记了,请给我发消息,我可以为其编写另一个测试

\n
MergeStream\n    \xe2\x9c\x93 should throw an error when nextStream is not implemented (9ms)\n    \xe2\x9c\x93 should throw an error when nextStream returns a stream with lower index (4ms)\n    \xe2\x9c\x93 should reset index after new main stream (5ms)\n    \xe2\x9c\x93 should write a single stream normally (50ms)\n    \xe2\x9c\x93 should be able to merge a stream (2ms)\n    \xe2\x9c\x93 should be able to append a stream on the end (1ms)\n    \xe2\x9c\x93 should be able to merge large streams into a smaller stream (396ms)\n    \xe2\x9c\x93 should be able to merge at the correct index (2ms)\n
Run Code Online (Sandbox Code Playgroud)\n

用法

\n
const mergingStream = new Merge({\n    *nextStream(): IterableIterator<MergingStream> {\n        for (let i = 0; i < 10; i++) {\n            const stream = new Readable();\n            stream.push(i.toString());\n            stream.push(null);\n            yield {index: i * 2, stream};\n        }\n    },\n});\nconst template = new Readable();\ntemplate.push(\', , , , , , , , , \');\ntemplate.push(null);\ntemplate.pipe(mergingStream).pipe(getSink());\n
Run Code Online (Sandbox Code Playgroud)\n

我们水槽的结果将是

\n
0, 1, 2, 3, 4, 5, 6, 7, 8, 9\n
Run Code Online (Sandbox Code Playgroud)\n

最后的想法

\n

这不是最省时的方法,因为我们一次只管理一个合并缓冲区。所以有很多等待。对于我的用例来说,这很好。我关心它不会消耗我的记忆,这个解决方案对我有用。但肯定还有一些优化的空间。完整的代码有一些此处未完全解释的额外功能,例如附加流和合并到附加流中。不过,它们已通过注释进行了解释。

\n