如何在成功时将迭代器转换为流或在失败时将空流转换为空流?

Rbj*_*bjz 2 asynchronous future rust

我想采用常规迭代器并将其转换为流,以便我可以进行进一步的流处理.麻烦的是我可能有一个迭代器或错误来处理.我觉得我很接近这个:

#[macro_use]
extern crate log;
extern crate futures; // 0.1.21
extern crate tokio;

use futures::prelude::*;
use futures::{future, stream};
use std::fmt::Debug;
use std::net::{SocketAddr, ToSocketAddrs};

fn resolve(addrs: impl ToSocketAddrs + Debug) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
            Some(a) => Some(future::ok((a, iter))),
            None => None,
        }),
        Err(e) => {
            error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
            stream::empty()
        }
    }
}

fn main() {
    let task = resolve("1.2.3.4:12345")
        .map_err(|e| error!("{:?}", e))
        .for_each(|addr| info!("{:?}", addr))
        .fold();
    tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)

操场

error[E0308]: match arms have incompatible types
  --> src/main.rs:12:5
   |
12 | /     match addrs.to_socket_addrs() {
13 | |         Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
14 | |             Some(a) => Some(future::ok((a, iter))),
15 | |             None => None,
...  |
20 | |         }
21 | |     }
   | |_____^ expected struct `futures::stream::Unfold`, found struct `futures::stream::Empty`
   |
   = note: expected type `futures::stream::Unfold<<impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter, [closure@src/main.rs:13:42: 16:10], futures::FutureResult<(std::net::SocketAddr, <impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter), _>>`
              found type `futures::stream::Empty<_, _>`
note: match arm with an incompatible type
  --> src/main.rs:17:19
   |
17 |           Err(e) => {
   |  ___________________^
18 | |             error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
19 | |             stream::empty()
20 | |         }
   | |_________^

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/main.rs:27:10
   |
27 |         .for_each(|addr| info!("{:?}", addr))
   |          ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0599]: no method named `fold` found for type `futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()>` in the current scope
  --> src/main.rs:28:10
   |
28 |         .fold();
   |          ^^^^
   |
   = note: the method `fold` exists but the following trait bounds were not satisfied:
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : futures::Stream`
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : std::iter::Iterator`
Run Code Online (Sandbox Code Playgroud)

提示非常明显.这两个Result我从match不同的地方返回,应该是一样的.现在,我怎么能这样做才能返回一个流?

She*_*ter 5

Rust是一种静态类型语言,这意味着函数的返回类型必须是单一类型,在编译时是已知的.您正在尝试返回多个类型,在运行时决定.

与原始文件最接近的解决方案是始终返回Unfold流:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::unfold(addrs.to_socket_addrs(), |r| {
        match r {
            Ok(mut iter) => iter.next().map(|addr| future::ok((addr, Ok(iter)))),
            Err(_) => None,
        }
    })
}
Run Code Online (Sandbox Code Playgroud)

但为什么重新发明轮子?

futures::stream::iter_ok

Iteratora 转换Stream为始终准备好生成下一个值的a.

期货箱落实的后续版本StreamEither,这使得这个很优雅:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::iter_ok(iter).left_stream(),
        Err(_) => stream::empty().right_stream(),
    }
}
Run Code Online (Sandbox Code Playgroud)

将这个功能反向移植到期货0.1是很简单的(也许有人应该把它作为一个PR提交给那些坚持0.1的人......):

enum MyEither<L, R> {
    Left(L),
    Right(R),
}

impl<L, R> Stream for MyEither<L, R>
where
    L: Stream,
    R: Stream<Item = L::Item, Error = L::Error>,
{
    type Item = L::Item;
    type Error = L::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self {
            MyEither::Left(l) => l.poll(),
            MyEither::Right(r) => r.poll(),
        }
    }
}

trait EitherStreamExt {
    fn left_stream<R>(self) -> MyEither<Self, R>
    where
        Self: Sized;
    fn right_stream<L>(self) -> MyEither<L, Self>
    where
        Self: Sized;
}

impl<S: Stream> EitherStreamExt for S {
    fn left_stream<R>(self) -> MyEither<Self, R> {
        MyEither::Left(self)
    }
    fn right_stream<L>(self) -> MyEither<L, Self> {
        MyEither::Right(self)
    }
}
Run Code Online (Sandbox Code Playgroud)

更好的是,使用Result迭代器Stream::flatten存在的事实 :

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::iter_ok(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .flatten()
}
Run Code Online (Sandbox Code Playgroud)

或者,如果您确实要打印错误:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::once(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .map_err(|e| eprintln!("err: {}", e))
        .flatten()
}
Run Code Online (Sandbox Code Playgroud)

也可以看看:

  • @FrederikBaetens 现在是 [`stream::iter`](https://docs.rs/futures/latest/futures/stream/fn.iter.html)。 (2认同)