我有一个很长的期货清单,我想使用Stream::buffer_unordered/运行它们Stream::buffered。我将这个流合并到一个 future 中for_each,然后用 Tokio 执行它。其中一个期货返回错误是很常见的。根据文档,for_each返回错误时将停止。
当返回这些错误时,如何忽略或只打印一条消息并继续执行后续的 future?
这是与我的情况类似的一般代码:
use futures::stream;
use futures::stream::Stream;
use futures::future::err;
use futures::future::ok;
use tokio;
fn main() {
let queries: Vec<u32> = (0..10).collect();
let futures = queries.into_iter().map(move |num| {
println!("Started {}", num);
// Maybe throw error
let future = match num % 3 {
0 => ok::<u32, u32>(num),
_ => err::<u32, u32>(num)
};
future
});
let stream = stream::iter_ok(futures);
let num_workers = 8;
let future = stream
.buffer_unordered(num_workers)
.map_err(|err| {
println!("Error on {:?}", err);
})
.for_each(|n| {
println!("Success on {:?}", n);
Ok(())
});
tokio::runtime::run(future);
}
Run Code Online (Sandbox Code Playgroud)
如果您尝试这个示例,则 future 队列将在Err抛出 an 时提前停止执行。
Stream::map_err\xe2\x80\x94 提供了错误值,它可以转换类型,但会将其保留为错误。
Stream::or_else\xe2\x80\x94 提供了错误值,它可以将错误转换为成功,而成功值保持不变。
Stream::then\xe2\x80\x94 提供了成功和错误值,可以做任何你想做的事情。
Stream::map无法让您将错误转化为成功,因此它没有用。Stream::or_else 确实提供了这种能力,但是当您可以将错误类型转换为成功类型时使用它。仅有的Stream::then让您能够同时转换两种类型。
Stream::flatten可用于将流的流转换为单个流。
结合这一事实Result,您可以创建:
stream\n .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))\n .flatten()\nRun Code Online (Sandbox Code Playgroud)\n\n无论流的项目是Okor Err,我们都会将其转换为迭代器并从中创建流。然后我们压平溪流。
如果你想打印出错误,我会使用Stream::inspect_err:
stream.inspect_err(|err| println!("Error on {:?}", err))\nRun Code Online (Sandbox Code Playgroud)\n\n完整代码:
\n\nuse futures::{\n future,\n stream::{self, Stream},\n}; // 0.1.25;\nuse tokio; // 0.1.14\n\nfn main() {\n let stream = stream::iter_ok({\n (0..10).map(|num| {\n println!("Started {}", num);\n match num % 3 {\n 0 => future::ok(num),\n _ => future::err(num),\n }\n })\n })\n .buffer_unordered(2);\n\n let stream = stream\n .inspect_err(|err| println!("Error on {:?}", err))\n .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))\n .flatten();\n\n tokio::run({\n stream.for_each(|n| {\n println!("Success on {:?}", n);\n Ok(())\n })\n });\n}\nRun Code Online (Sandbox Code Playgroud)\n\nstream\n .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))\n .flatten()\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
2517 次 |
| 最近记录: |