use*_*287 10 fs node.js express ibm-watson node-streams
-期望的行为
-实际行为
-我尝试过的内容
-再现步骤
-研究
期望的行为
从多个api请求接收的多个可读流通过管道传输到单个可写流。
api响应来自ibm-watson的textToSpeech.synthesize()方法。
需要多个请求的原因是因为该服务5KB
对文本输入有限制。
因此18KB
,例如一个字符串,需要四个请求才能完成。
实际行为
可写流文件不完整且出现乱码。
该应用程序似乎“挂起”。
当我尝试.mp3
在音频播放器中打开不完整的文件时,它说文件已损坏。
打开和关闭文件的过程似乎会增加文件的大小-就像打开文件一样,它会促使更多数据流入其中。
对于较大的输入(例如,四个4000字节或更少的字符串),不良行为更加明显。
我尝试过的
我尝试了几种方法,使用npm包Combined-stream,Combined-stream2,multistream和archiver 将可读流传递给单个可写流或多个可写流,它们都会导致文件不完整。我最后一次尝试不使用任何软件包,并在Steps To Reproduce
下面的部分中显示。
因此,我在质疑应用程序逻辑的每个部分:
01. Watson文本对语音API请求的响应类型是什么?
Response type: NodeJS.ReadableStream|FileObject|Buffer
Run Code Online (Sandbox Code Playgroud)
我很困惑,响应类型是三种可能的事情之一。
在所有尝试中,我一直假设它是一个readable stream
。
02. 我可以在一个地图函数中发出多个api请求吗?
03. 我可以在中包装每个请求
promise()
并解决response
吗?04.是否 可以将结果数组分配给
promises
变量?05. 我可以声明
var audio_files = await Promise.all(promises)
吗?06. 声明之后,所有答复都“完成”了吗?
07. 如何正确地将每个响应传递给可写流?
08. 如何检测所有管道何时完成,以便可以将文件发送回客户端?
对于问题2-6,我假设答案为“是”。
我认为我的失败与问题7和8有关。
重现步骤
可以用四个随机生成的文本串的阵列与相应的字节大小测试此代码3975
,3863
,3974
和3629
字节- 这里是阵列的引擎收录。
Response type: NodeJS.ReadableStream|FileObject|Buffer
Run Code Online (Sandbox Code Playgroud)
该负责人举例所示:
// route handler
app.route("/api/:api_version/tts")
.get(api_tts_get);
// route handler middleware
const api_tts_get = async (req, res) => {
var query_parameters = req.query;
var file_name = query_parameters.file_name;
var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV
var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root
// for each string in an array, send it to the watson api
var promises = text_string_array.map(text_string => {
return new Promise((resolve, reject) => {
// credentials
var textToSpeech = new TextToSpeechV1({
iam_apikey: iam_apikey,
url: tts_service_url
});
// params
var synthesizeParams = {
text: text_string,
accept: 'audio/mp3',
voice: 'en-US_AllisonV3Voice'
};
// make request
textToSpeech.synthesize(synthesizeParams, (err, audio) => {
if (err) {
console.log("synthesize - an error occurred: ");
return reject(err);
}
resolve(audio);
});
});
});
try {
// wait for all responses
var audio_files = await Promise.all(promises);
var audio_files_length = audio_files.length;
var write_stream = fs.createWriteStream(`${relative_path}.mp3`);
audio_files.forEach((audio, index) => {
// if this is the last value in the array,
// pipe it to write_stream,
// when finished, the readable stream will emit 'end'
// then the .end() method will be called on write_stream
// which will trigger the 'finished' event on the write_stream
if (index == audio_files_length - 1) {
audio.pipe(write_stream);
}
// if not the last value in the array,
// pipe to write_stream and leave open
else {
audio.pipe(write_stream, { end: false });
}
});
write_stream.on('finish', function() {
// download the file (using absolute_path)
res.download(`${absolute_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
// delete the file (using relative_path)
fs.unlink(`${relative_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
});
});
});
} catch (err) {
console.log("there was an error getting tts");
console.log(err);
}
}
Run Code Online (Sandbox Code Playgroud)
据我所知,这似乎对单个请求有效,但对多个请求却无效。
研究
关于可读和可写流,可读流模式(流动和暂停),“数据”,“结束”,“排空”和“完成”事件,pipe(),fs.createReadStream()和fs.createWriteStream()
几乎所有Node.js应用程序,无论多么简单,都以某种方式使用流...
textToSpeech.synthesize(synthesizeParams)
.then(audio => {
audio.pipe(fs.createWriteStream('hello_world.mp3'));
})
.catch(err => {
console.log('error:', err);
});
Run Code Online (Sandbox Code Playgroud)
https://nodejs.org/api/stream.html#stream_api_for_stream_consumers
可读流具有两种主要模式,这些模式会影响我们的消费方式...它们可以处于
paused
模式下,也可以处于模式flowing
下。所有可读流开始在默认情况下,暂停模式,但他们可以很容易地切换到flowing
再换paused
需要的时候......只是增加一个data
事件处理程序切换暂停流进flowing
模式和移除data
事件处理程序切换流回到paused
模式。
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
这是可与可读和可写流一起使用的重要事件和功能的列表
可读流上最重要的事件是:
的
data
情况下,每当流通过数据块给消费者的被发射的end
情况下,当没有更多的数据从流中饮用被发射。可写流上最重要的事件是:
的
drain
事件,这是在该可写流可以接收更多的数据的信号。finish
当所有数据都已刷新到基础系统时发出此事件。
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
.pipe()
负责侦听来自的“数据”和“结束”事件fs.createReadStream()
。
https://github.com/substack/stream-handbook#why-you-should-use-streams
.pipe()
只是一个函数,它采用可读的源流src并将输出挂钩到目标可写流dst
https://github.com/substack/stream-handbook#pipe
该
pipe()
方法的返回值是目标流
https://flaviocopes.com/nodejs-streams/#pipe
默认情况下,当源流发出时,在目标流上调用stream.end(),因此目标不再可写。要禁用此默认行为,可以将该选项作为传递,从而使目标流保持打开状态:
Writable
Readable
'end'
end
false
https://nodejs.org/api/stream.html#stream_visible_pipe_destination_options
在调用方法
'finish'
之后,将发出该事件stream.end()
,并且所有数据都已刷新到基础系统。
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
Run Code Online (Sandbox Code Playgroud)
https://nodejs.org/api/stream.html#stream_event_finish
如果您尝试读取多个文件并将其通过管道传输到可写流,则必须将每个文件通过管道传输到可写流并
end: false
在执行操作时通过,因为默认情况下,当没有更多数据时,可读流将结束可写流被阅读。这是一个例子:
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.log('All writes are now complete.');
});
Run Code Online (Sandbox Code Playgroud)
您想将第二个读取添加到事件监听器中,以完成第一个读取...
var ws = fs.createWriteStream('output.pdf');
fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);
Run Code Online (Sandbox Code Playgroud)
相关的Google搜索:
如何将多个可读流传输到单个可写流?节点js
涉及相同或相似主题的问题,没有权威性答案(或者可能“过时”):
这里要解决的核心问题是异步性。您几乎已经完成了:您发布的代码的问题是您将所有源流并行且无序地输送到目标流中。这意味着data
块将从不同的音频流中随机流动 - 即使您的end
事件也会超过pipe
s 而不会end
过早关闭目标流,这可能解释了为什么它在您重新打开它后会增加。
你想要的是按顺序管道它们 - 你甚至在引用时发布了解决方案
您想将第二次读取添加到事件监听器中,以便第一次读取完成......
或作为代码:
a.pipe(c, { end:false });
a.on('end', function() {
b.pipe(c);
}
Run Code Online (Sandbox Code Playgroud)
这会将源流按顺序传输到目标流中。
使用您的代码意味着将audio_files.forEach
循环替换为:
await Bluebird.mapSeries(audio_files, async (audio, index) => {
const isLastIndex = index == audio_files_length - 1;
audio.pipe(write_stream, { end: isLastIndex });
return new Promise(resolve => audio.on('end', resolve));
});
Run Code Online (Sandbox Code Playgroud)
请注意此处bluebird.js mapSeries的用法。
关于您的代码的进一步建议:
const
&let
而不是var
并考虑使用camelCase
进一步阅读,组合本机节点流的限制:https ://github.com/nodejs/node/issues/93
我会在这里给出我的两分钱,因为我最近看到了类似的问题!根据我的测试和研究,您可以将两个 .mp3 / .wav 流合并为一个。这会导致文件出现您提到的明显问题,例如截断、故障等。
我相信您可以正确组合音频流的唯一方法是使用旨在连接声音文件/数据的模块。
我获得的最佳结果是将音频合成为单独的文件,然后像这样组合:
function combineMp3Files(files, outputFile) {
const ffmpeg = require("fluent-ffmpeg");
const combiner = ffmpeg().on("error", err => {
console.error("An error occurred: " + err.message);
})
.on("end", () => {
console.log('Merge complete');
});
// Add in each .mp3 file.
files.forEach(file => {
combiner.input(file)
});
combiner.mergeToFile(outputFile);
}
Run Code Online (Sandbox Code Playgroud)
这使用了node- Fluent-ffmpeg库,需要安装ffmpeg。
除此之外,我建议您询问 IBM 支持人员(因为正如您所说,文档似乎没有指出这一点)API 调用者应如何组合合成音频,因为您的用例将非常常见。
要创建文本文件,我执行以下操作:
// Switching to audio/webm and the V3 voices.. much better output
function synthesizeText(text) {
const synthesizeParams = {
text: text,
accept: 'audio/webm',
voice: 'en-US_LisaV3Voice'
};
return textToSpeech.synthesize(synthesizeParams);
}
async function synthesizeTextChunksSeparateFiles(text_chunks) {
const audioArray = await Promise.all(text_chunks.map(synthesizeText));
console.log(`synthesizeTextChunks: Received ${audioArray.length} result(s), writing to separate files...`);
audioArray.forEach((audio, index) => {
audio.pipe(fs.createWriteStream(`audio-${index}.mp3`));
});
}
Run Code Online (Sandbox Code Playgroud)
然后像这样组合:
function combineMp3Files(files, outputFile) {
const ffmpeg = require("fluent-ffmpeg");
const combiner = ffmpeg().on("error", err => {
console.error("An error occurred: " + err.message);
})
.on("end", () => {
console.log('Merge complete');
});
// Add in each .mp3 file.
files.forEach(file => {
combiner.input(file)
});
combiner.mergeToFile(outputFile);
}
Run Code Online (Sandbox Code Playgroud)
我应该指出,我分两个单独的步骤执行此操作(等待几百毫秒也可以),但等待各个文件被写入,然后将它们组合起来应该很容易。
这是一个可以执行此操作的函数:
async function synthesizeTextChunksThenCombine(text_chunks, outputFile) {
const audioArray = await Promise.all(text_chunks.map(synthesizeText));
console.log(`synthesizeTextChunks: Received ${audioArray.length} result(s), writing to separate files...`);
let writePromises = audioArray.map((audio, index) => {
return new Promise((resolve, reject) => {
audio.pipe(fs.createWriteStream(`audio-${index}.mp3`).on('close', () => {
resolve(`audio-${index}.mp3`);
}));
})
});
let files = await Promise.all(writePromises);
console.log('synthesizeTextChunksThenCombine: Separate files: ', files);
combineMp3Files(files, outputFile);
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
730 次 |
最近记录: |