使用 Tokio 期货的多播 UDP 数据包

Mat*_*ahl 5 sockets udp rust rust-tokio

我正在玩 Tokio 和 Rust,作为一个例子,我正在尝试编写一个简单的 UDP 代理,它只会在一个套接字上接受 UDP 数据包并将其发送到多个其他目的地。但是,我偶然发现需要将接收到的数据包发送到多个地址的情况,并且不确定如何以惯用的方式做到这一点。

我有这么远的代码:

extern crate bytes;
extern crate futures;

use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;

fn main() {
    let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
    let forwarder = {
        let socket = UdpSocket::bind(&listen_address).unwrap();
        let peers = vec![
            "192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
            "192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
        ];
        UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).for_each(
            move |(bytes, _from)| {
                // These are the problematic lines
                for peer in peers.iter() {
                    socket.send_dgram(&bytes, &peer);
                }
                Ok(())
            },
        )
    };

    tokio::run({
        forwarder
            .map_err(|err| println!("Error: {}", err))
            .map(|_| ())
    });
}
Run Code Online (Sandbox Code Playgroud)

有问题的线路试图使用新绑定的套接字将接收到的数据包发送到多个其他地址。

现有的示例都将数据包转发到单个目的地,或者在内部使用 mpsc 通道在内部任务之间进行通信。我不认为这是必要的,并且应该可以在不必为每个侦听套接字生成多个任务的情况下完成。

更新:感谢@Ömer-erden,我得到了这个有效的代码。

extern crate bytes;
extern crate futures;

use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listen_address = "0.0.0.0:4711".parse::<SocketAddr>()?;
    let socket = UdpSocket::bind(&listen_address)?;
    let peers: Vec<SocketAddr> = vec!["192.168.1.136:8080".parse()?, "192.168.1.136:8081".parse()?];
    let (mut writer, reader) = UdpFramed::new(socket, BytesCodec::new()).split();
    let forwarder = reader.for_each(move |(bytes, _from)| {
        for peer in peers.iter() {
            writer.start_send((bytes.clone().into(), peer.clone()))?;
        }
        writer.poll_complete()?;
        Ok(())
    });

    tokio::run({
        forwarder
            .map_err(|err| println!("Error: {}", err))
            .map(|_| ())
    });
    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

注意:

  • 没有必要poll_completion为 each调用start_send:只需要在所有start_send已调度后调用即可。

  • 出于某种原因,peer在调用之间删除了 的内容(但没有编译器错误),生成错误 22(这通常是因为给了错误地址sendto(2))。

    查看调试器,很明显第二次,对等地址指向无效内存。我选择克隆它peer

  • 我删除了调用并向上unwrap()传播Result

Öme*_*den 2

您的代码有一个逻辑错误:您试图将同一地址绑定两次,分别作为发送者和接收者。相反,您可以使用stream和sinkUdpFramed具有提供该功能的功能,请参阅Sink

ASink是一个可以异步发送其他值的值。

let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
let forwarder = {
    let (mut socket_sink, socket_stream) =
        UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).split();
    let peers = vec![
        "192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
        "192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
    ];

    socket_stream.for_each(move |(bytes, _from)| {
        for peer in peers.iter() {
            socket_sink.start_send((bytes.clone().into(), *peer));
            socket_sink.poll_complete();
        }
        Ok(())
    })
};

tokio::run({
    forwarder
        .map_err(|err| println!("Error: {}", err))
        .map(|_| ())
});
Run Code Online (Sandbox Code Playgroud)