处理流时如何删除或忽略错误?

jee*_*cat 4 future rust

我有一个很长的期货清单,我想使用Stream::buffer_unordered/运行它们Stream::buffered。我将这个流合并到一个 future 中for_each,然后用 Tokio 执行它。其中一个期货返回错误是很常见的。根据文档,for_each返回错误时将停止。

当返回这些错误时,如何忽略或只打印一条消息并继续执行后续的 future?

这是与我的情况类似的一般代码:

use futures::stream;
use futures::stream::Stream;
use futures::future::err;
use futures::future::ok;
use tokio;

fn main() {
    let queries: Vec<u32> = (0..10).collect();
    let futures = queries.into_iter().map(move |num| {
        println!("Started {}", num);
        // Maybe throw error
        let future = match num % 3 {
            0 => ok::<u32, u32>(num),
            _ => err::<u32, u32>(num)
        };
        future
    });

    let stream = stream::iter_ok(futures);
    let num_workers = 8;
    let future = stream
        .buffer_unordered(num_workers)
        .map_err(|err| {
            println!("Error on {:?}", err);
        })
        .for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        });

    tokio::runtime::run(future);
}
Run Code Online (Sandbox Code Playgroud)

铁锈游乐场

如果您尝试这个示例,则 future 队列将在Err抛出 an 时提前停止执行。

She*_*ter 6

    \n
  • Stream::map_err\xe2\x80\x94 提供了错误值,它可以转换类型,但会将其保留为错误。

  • \n
  • Stream::or_else\xe2\x80\x94 提供了错误值,它可以将错误转换为成功,而成功值保持不变。

  • \n
  • Stream::then\xe2\x80\x94 提供了成功和错误值,可以做任何你想做的事情。

  • \n
\n\n

Stream::map无法让您将错误转化为成功,因此它没有用。Stream::or_else 确实提供了这种能力,但是当您可以将错误类型转换为成功类型时使用它。仅有的Stream::then让您能够同时转换两种类型。

\n\n

Stream::flatten可用于将流的流转换为单个流。

\n\n

结合这一事实Result,您可以创建:

\n\n
stream\n    .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))\n    .flatten()\n
Run Code Online (Sandbox Code Playgroud)\n\n

无论流的项目是Okor Err,我们都会将其转换为迭代器并从中创建流。然后我们压平溪流。

\n\n

如果你想打印出错误,我会使用Stream::inspect_err

\n\n
stream.inspect_err(|err| println!("Error on {:?}", err))\n
Run Code Online (Sandbox Code Playgroud)\n\n

完整代码:

\n\n
use futures::{\n    future,\n    stream::{self, Stream},\n}; // 0.1.25;\nuse tokio; // 0.1.14\n\nfn main() {\n    let stream = stream::iter_ok({\n        (0..10).map(|num| {\n            println!("Started {}", num);\n            match num % 3 {\n                0 => future::ok(num),\n                _ => future::err(num),\n            }\n        })\n    })\n    .buffer_unordered(2);\n\n    let stream = stream\n        .inspect_err(|err| println!("Error on {:?}", err))\n        .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))\n        .flatten();\n\n    tokio::run({\n        stream.for_each(|n| {\n            println!("Success on {:?}", n);\n            Ok(())\n        })\n    });\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n
stream\n    .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))\n    .flatten()\n
Run Code Online (Sandbox Code Playgroud)\n