如何在 Javascript 中将两个 ReadableStreams 通过管道传输到一个 WritableStream 中?

asd*_*159 2 javascript pipeline tee whatwg-streams-api

我有两个 ReadableStream,我想将它们通过管道传输到一个 WritableStream,其中通过 ReadableStream 的任何数据都会立即直接进入 WritableStream。

我可以做相反的事情,通过使用ReadableStream.prototype.tee()将一个 ReadableStream 一分为二,但我不知道如何将两个合并为一个。

const textarea = document.querySelector("textarea");


// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
  const sayMom = () => controller.enqueue("Mom! ");
  setInterval(sayMom, 1000);
}});

// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
  const sayLois = () => controller.enqueue("Lois! ");
  setInterval(sayLois, 700);
}});

// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });


momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display
loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
Run Code Online (Sandbox Code Playgroud)
<textarea readonly></textarea>
Run Code Online (Sandbox Code Playgroud)

Ry-*_*Ry- 5

手动地,通过比较每个读者最近的阅读来生成整体阅读并根据需要启动这些阅读:

const never = new Promise(() => {});

const mergeStreams = streams => {
    const readers = streams.map(s => s.getReader());
    const reads = streams.map(() => null);
    const dones = [];
    const allDone = Promise.all(streams.map(s => new Promise(resolve => {
        dones.push(resolve);
    })));

    return new ReadableStream({
        start: controller => {
            allDone.then(() => {
                controller.close();
            });
        },
        pull: controller =>
            Promise.race(
                readers.map((r, i) =>
                    reads[i] ??= r.read().then(({value, done}) => {
                        if (done) {
                            dones[i]();
                            return never;
                        }

                        controller.enqueue(value);
                        reads[i] = null;
                    })
                )
            ),
        cancel: reason => {
            for (const reader of readers) {
                reader.cancel(reason);
            }
        },
    });
};
Run Code Online (Sandbox Code Playgroud)

const never = new Promise(() => {});

const mergeStreams = streams => {
    const readers = streams.map(s => s.getReader());
    const reads = streams.map(() => null);
    const dones = [];
    const allDone = Promise.all(streams.map(s => new Promise(resolve => {
        dones.push(resolve);
    })));

    return new ReadableStream({
        start: controller => {
            allDone.then(() => {
                controller.close();
            });
        },
        pull: controller =>
            Promise.race(
                readers.map((r, i) =>
                    reads[i] ??= r.read().then(({value, done}) => {
                        if (done) {
                            dones[i]();
                            return never;
                        }

                        controller.enqueue(value);
                        reads[i] = null;
                    })
                )
            ),
        cancel: reason => {
            for (const reader of readers) {
                reader.cancel(reason);
            }
        },
    });
};
Run Code Online (Sandbox Code Playgroud)
const textarea = document.querySelector("textarea");


const never = new Promise(() => {});

const mergeStreams = streams => {
    const readers = streams.map(s => s.getReader());
    const reads = streams.map(() => null);
    const dones = [];
    const allDone = Promise.all(streams.map(s => new Promise(resolve => {
        dones.push(resolve);
    })));

    return new ReadableStream({
        start: controller => {
            allDone.then(() => {
                controller.close();
            });
        },
        pull: controller =>
            Promise.race(
                readers.map((r, i) =>
                    reads[i] ??= r.read().then(({value, done}) => {
                        if (done) {
                            dones[i]();
                            return never;
                        }

                        controller.enqueue(value);
                        reads[i] = null;
                    })
                )
            ),
        cancel: reason => {
            for (const reader of readers) {
                reader.cancel(reason);
            }
        },
    });
};


// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
  const sayMom = () => controller.enqueue("Mom! ");
  setInterval(sayMom, 1000);
}});

// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
  const sayLois = () => controller.enqueue("Lois! ");
  setInterval(sayLois, 700);
}});

// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });


mergeStreams([
  momReadableStream,
  loisReadableStream,
]).pipeTo(writableStream).catch(console.error);
Run Code Online (Sandbox Code Playgroud)