如何将来自多个api请求的多个可读流传递到单个可写流?

use*_*287 10 fs node.js express ibm-watson node-streams

-期望的行为
-实际行为
-我尝试过的内容
-再现步骤
-研究


期望的行为

从多个api请求接收的多个可读流通过管道传输到单个可写流。

api响应来自ibm-watson的textToSpeech.synthesize()方法。

需要多个请求的原因是因为该服务5KB对文本输入有限制。

因此18KB,例如一个字符串,需要四个请求才能完成。

实际行为

可写流文件不完整且出现乱码。

该应用程序似乎“挂起”。

当我尝试.mp3在音频播放器中打开不完整的文件时,它说文件已损坏。

打开和关闭文件的过程似乎会增加文件的大小-就像打开文件一样,它会促使更多数据流入其中。

对于较大的输入(例如,四个4000字节或更少的字符串),不良行为更加明显。

我尝试过的

我尝试了几种方法,使用npm包Combined-streamCombined-stream2multistreamarchiver 将可读流传递给单个可写流或多个可写流,它们都会导致文件不完整。我最后一次尝试不使用任何软件包,并在Steps To Reproduce下面的部分中显示。

因此,我在质疑应用程序逻辑的每个部分:

01. Watson文本对语音API请求的响应类型是什么?

语音文档文本,表示api响应类型为:

Response type: NodeJS.ReadableStream|FileObject|Buffer
Run Code Online (Sandbox Code Playgroud)

我很困惑,响应类型是三种可能的事情之一。

在所有尝试中,我一直假设它是一个readable stream

02. 我可以在一个地图函数中发出多个api请求吗?

03. 我可以在中包装每个请求promise()并解决response吗?

04.是否 可以将结果数组分配给promises变量?

05. 我可以声明var audio_files = await Promise.all(promises)吗?

06. 声明之后,所有答复都“完成”了吗?

07. 如何正确地将每个响应传递给可写流?

08. 如何检测所有管道何时完成,以便可以将文件发送回客户端?

对于问题2-6,我假设答案为“是”。

我认为我的失败与问题7和8有关。

重现步骤

可以用四个随机生成的文本串的阵列与相应的字节大小测试此代码3975386339743629字节- 这里是阵列的引擎收录

Response type: NodeJS.ReadableStream|FileObject|Buffer
Run Code Online (Sandbox Code Playgroud)

负责人举例所示:

// route handler
app.route("/api/:api_version/tts")
    .get(api_tts_get);

// route handler middleware
const api_tts_get = async (req, res) => {

    var query_parameters = req.query;

    var file_name = query_parameters.file_name;
    var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV

    var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
    var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root

    // for each string in an array, send it to the watson api  
    var promises = text_string_array.map(text_string => {

        return new Promise((resolve, reject) => {

            // credentials
            var textToSpeech = new TextToSpeechV1({
                iam_apikey: iam_apikey,
                url: tts_service_url
            });

            // params  
            var synthesizeParams = {
                text: text_string,
                accept: 'audio/mp3',
                voice: 'en-US_AllisonV3Voice'
            };

            // make request  
            textToSpeech.synthesize(synthesizeParams, (err, audio) => {
                if (err) {
                    console.log("synthesize - an error occurred: ");
                    return reject(err);
                }
                resolve(audio);
            });

        });
    });

    try {
        // wait for all responses
        var audio_files = await Promise.all(promises);
        var audio_files_length = audio_files.length;

        var write_stream = fs.createWriteStream(`${relative_path}.mp3`);

        audio_files.forEach((audio, index) => {

            // if this is the last value in the array, 
            // pipe it to write_stream, 
            // when finished, the readable stream will emit 'end' 
            // then the .end() method will be called on write_stream  
            // which will trigger the 'finished' event on the write_stream    
            if (index == audio_files_length - 1) {
                audio.pipe(write_stream);
            }
            // if not the last value in the array, 
            // pipe to write_stream and leave open 
            else {
                audio.pipe(write_stream, { end: false });
            }

        });

        write_stream.on('finish', function() {

            // download the file (using absolute_path)  
            res.download(`${absolute_path}.mp3`, (err) => {
                if (err) {
                    console.log(err);
                }
                // delete the file (using relative_path)  
                fs.unlink(`${relative_path}.mp3`, (err) => {
                    if (err) {
                        console.log(err);
                    }
                });
            });

        });


    } catch (err) {
        console.log("there was an error getting tts");
        console.log(err);
    }

}
Run Code Online (Sandbox Code Playgroud)

据我所知,这似乎对单个请求有效,但对多个请求却无效。

研究

关于可读和可写流,可读流模式(流动和暂停),“数据”,“结束”,“排空”和“完成”事件,pipe(),fs.createReadStream()和fs.createWriteStream()


几乎所有Node.js应用程序,无论多么简单,都以某种方式使用流...

textToSpeech.synthesize(synthesizeParams)
  .then(audio => {
    audio.pipe(fs.createWriteStream('hello_world.mp3'));
  })
  .catch(err => {
    console.log('error:', err);
  });
Run Code Online (Sandbox Code Playgroud)

https://nodejs.org/api/stream.html#stream_api_for_stream_consumers


可读流具有两种主要模式,这些模式会影响我们的消费方式...它们可以处于paused模式下,也可以处于模式flowing下。所有可读流开始在默认情况下,暂停模式,但他们可以很容易地切换到flowing再换paused需要的时候......只是增加一个data事件处理程序切换暂停流进flowing模式和移除data事件处理程序切换流回到paused模式。

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


这是可与可读和可写流一起使用的重要事件和功能的列表

在此处输入图片说明

可读流上最重要的事件是:

data情况下,每当流通过数据块给消费者的被发射的end情况下,当没有更多的数据从流中饮用被发射。

可写流上最重要的事件是:

drain事件,这是在该可写流可以接收更多的数据的信号。finish当所有数据都已刷新到基础系统时发出此事件。

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


.pipe()负责侦听来自的“数据”和“结束”事件fs.createReadStream()

https://github.com/substack/stream-handbook#why-you-should-use-streams


.pipe() 只是一个函数,它采用可读的源流src并将输出挂钩到目标可写流 dst

https://github.com/substack/stream-handbook#pipe


pipe()方法的返回值是目标流

https://flaviocopes.com/nodejs-streams/#pipe


默认情况下,当源流发出时,在目标流上调用stream.end(),因此目标不再可写。要禁用此默认行为,可以将该选项作为传递,从而使目标流保持打开状态:WritableReadable'end'endfalse

https://nodejs.org/api/stream.html#stream_visible_pipe_destination_options


在调用方法'finish'之后,将发出该事件stream.end(),并且所有数据都已刷新到基础系统。

const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream

let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');

// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});

// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
Run Code Online (Sandbox Code Playgroud)

https://nodejs.org/api/stream.html#stream_event_finish


如果您尝试读取多个文件并将其通过管道传输到可写流,则必须将每个文件通过管道传输到可写流并end: false在执行操作时通过,因为默认情况下,当没有更多数据时,可读流将结束可写流被阅读。这是一个例子:

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.log('All writes are now complete.');
});
Run Code Online (Sandbox Code Playgroud)

/sf/answers/2164137391/


您想将第二个读取添加到事件监听器中,以完成第一个读取...

var ws = fs.createWriteStream('output.pdf');

fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);
Run Code Online (Sandbox Code Playgroud)

/sf/answers/1962348811/


节点流简史-部分一个2


相关的Google搜索:

如何将多个可读流传输到单个可写流?节点js

涉及相同或相似主题的问题,没有权威性答案(或者可能“过时”):

如何将多个ReadableStreams传递到单个WriteStream?

通过不同的可读流两次管道传输到同一可写流

将多个文件传送到一个响应

从两个管道流创建Node.js流

B M*_*B M 5

这里要解决的核心问题是异步性。您几乎已经完成了:您发布的代码的问题是您将所有源流并行且无序地输送到目标流中。这意味着data块将从不同的音频流中随机流动 - 即使您的end事件也会超过pipes 而不会end过早关闭目标流,这可能解释了为什么它在您重新打开它后会增加。

你想要的是按顺序管道它们 - 你甚至在引用时发布了解决方案

您想将第二次读取添加到事件监听器中,以便第一次读取完成......

或作为代码:

a.pipe(c, { end:false });
a.on('end', function() {
  b.pipe(c);
}
Run Code Online (Sandbox Code Playgroud)

这会将源流按顺序传输到目标流中。

使用您的代码意味着将audio_files.forEach循环替换为:

await Bluebird.mapSeries(audio_files, async (audio, index) => {  
    const isLastIndex = index == audio_files_length - 1;
    audio.pipe(write_stream, { end: isLastIndex });
    return new Promise(resolve => audio.on('end', resolve));
});
Run Code Online (Sandbox Code Playgroud)

请注意此处bluebird.js mapSeries的用法。

关于您的代码的进一步建议:

  • 你应该考虑使用lodash.js
  • 你应该使用const&let而不是var并考虑使用camelCase
  • 当您注意到“它适用于一个事件,但无法处理多个事件”时,请始终思考:异步性、排列、竞争条件。

进一步阅读,组合本机节点流的限制:https ://github.com/nodejs/node/issues/93


Ter*_*nox 3

我会在这里给出我的两分钱,因为我最近看到了类似的问题!根据我的测试和研究,您可以将两个 .mp3 / .wav 流合并为一个。这会导致文件出现您提到的明显问题,例如截断、故障等。

我相信您可以正确组合音频流的唯一方法是使用旨在连接声音文件/数据的模块。

我获得的最佳结果是将音频合成为单独的文件,然后像这样组合:

function combineMp3Files(files, outputFile) {
    const ffmpeg = require("fluent-ffmpeg");
    const combiner = ffmpeg().on("error", err => {
        console.error("An error occurred: " + err.message);
    })
    .on("end", () => {
        console.log('Merge complete');
    });

    // Add in each .mp3 file.
    files.forEach(file => {
        combiner.input(file)
    });

    combiner.mergeToFile(outputFile); 
}
Run Code Online (Sandbox Code Playgroud)

这使用了node- Fluent-ffmpeg库,需要安装ffmpeg

除此之外,我建议您询问 IBM 支持人员(因为正如您所说,文档似乎没有指出这一点)API 调用者应如何组合合成音频,因为您的用例将非常常见。

要创建文本文件,我执行以下操作:

// Switching to audio/webm and the V3 voices.. much better output 
function synthesizeText(text) {
    const synthesizeParams = {
        text: text,
        accept: 'audio/webm',
        voice: 'en-US_LisaV3Voice'
    };
    return textToSpeech.synthesize(synthesizeParams);
}


async function synthesizeTextChunksSeparateFiles(text_chunks) {
    const audioArray = await Promise.all(text_chunks.map(synthesizeText));
    console.log(`synthesizeTextChunks: Received ${audioArray.length} result(s), writing to separate files...`);
    audioArray.forEach((audio, index) => {
        audio.pipe(fs.createWriteStream(`audio-${index}.mp3`));
    });
}
Run Code Online (Sandbox Code Playgroud)

然后像这样组合:

function combineMp3Files(files, outputFile) {
    const ffmpeg = require("fluent-ffmpeg");
    const combiner = ffmpeg().on("error", err => {
        console.error("An error occurred: " + err.message);
    })
    .on("end", () => {
        console.log('Merge complete');
    });

    // Add in each .mp3 file.
    files.forEach(file => {
        combiner.input(file)
    });

    combiner.mergeToFile(outputFile); 
}
Run Code Online (Sandbox Code Playgroud)

我应该指出,我分两个单独的步骤执行此操作(等待几百毫秒也可以),但等待各个文件被写入,然后将它们组合起来应该很容易。

这是一个可以执行此操作的函数:

async function synthesizeTextChunksThenCombine(text_chunks, outputFile) {
    const audioArray = await Promise.all(text_chunks.map(synthesizeText));
    console.log(`synthesizeTextChunks: Received ${audioArray.length} result(s), writing to separate files...`);
    let writePromises = audioArray.map((audio, index) => {
        return new Promise((resolve, reject) => {
            audio.pipe(fs.createWriteStream(`audio-${index}.mp3`).on('close', () => {   
                resolve(`audio-${index}.mp3`);
            }));
        })
    });
    let files = await Promise.all(writePromises);
    console.log('synthesizeTextChunksThenCombine: Separate files: ', files);
    combineMp3Files(files, outputFile);
}
Run Code Online (Sandbox Code Playgroud)