我无法理解如何编写封装在一个结构中的并发异步代码。
我不确定如何准确解释这个问题,所以我会尝试用一个例子来解释。
假设我有一个UdpServer结构。该结构有多个与其行为相关的方法(例如handle_datagram,,,deserialize_datagram等)
如果我想让代码并发,我将生成 tokio 任务,这需要提供给它的闭包是静态的,这意味着我无法&self从内部调用这个任务只要&self不是静态的,这意味着我不能调用self.serialize_datagram().
我理解这个问题(不能保证结构会比线程寿命更长),但看不到解决它的正确方法。我知道可以将函数移出 impl,但这对我来说看起来不是一个好的解决方案。
另外,即使我们暂时假设我可以将其视为&self静态,但出于某种原因,这段代码在我看来仍然不正确(我猜不够生锈)。
另一个“解决方案”是采取self: Arc<Self>而不是&self,但这感觉更糟。
所以我假设有一些我不知道的模式。有人可以向我解释一下我应该如何重构整个事情吗?
示例代码:
struct UdpServer {}
impl UdpServer {
pub async fn run(&self) {
let socket = UdpSocket::bind(self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
// I spawn tokio task to enable concurrency
tokio::spawn(async move {
// But i can't use &self in here because it's not static.
let datagram = self.deserialize_datagram(buf).await;
self.handle_datagram(()).await;
});
}
}
pub async fn deserialize_datagram(&self, buf: &mut [u8]) -> Datagram {
unimplemented!()
}
pub async fn handle_datagram(&self, datagram: Datagram) {
unimplemented!()
}
}
Run Code Online (Sandbox Code Playgroud)
目前唯一的方法是self通过使用 使 Last 任意长Arc。由于run()是 上的一种方法UdpServer,因此需要更改为Arc<Self>,您考虑过但拒绝了,因为感觉更糟。尽管如此,这就是这样做的方法:
pub async fn run(self: Arc<Self>) {
let socket = UdpSocket::bind(&self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
tokio::spawn({
let me = Arc::clone(&self);
async move {
let datagram = me.deserialize_datagram(buf).await;
me.handle_datagram(datagram).await;
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
有趣的是,smol 异步运行时实际上可能提供您正在寻找的东西,因为它的执行器具有生命周期。该生命周期与调用者环境中的值相关联,并且执行器上生成的 future 可能会引用它。例如,这样编译:
use futures_lite::future;
use smol::{Executor, net::UdpSocket};
struct Datagram;
struct UdpServer {
addr: String,
}
impl UdpServer {
pub async fn run<'a>(&'a self, ex: &Executor<'a>) {
let socket = UdpSocket::bind(&self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
ex.spawn({
async move {
let datagram = self.deserialize_datagram(buf).await;
self.handle_datagram(datagram).await;
}
}).detach();
}
}
pub async fn deserialize_datagram(&self, _buf: &mut [u8]) -> Datagram {
unimplemented!()
}
pub async fn handle_datagram(&self, _datagram: Datagram) {
unimplemented!()
}
}
fn main() {
let server = UdpServer { addr: "127.0.0.1:8080".to_string() };
let ex = Executor::new();
future::block_on(server.run(&ex));
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7638 次 |
| 最近记录: |