我正在尝试构建一个对象,该对象可以管理来自 websocket 的提要,但能够在多个提要之间切换。
有一个Feed特点:
trait Feed {
async fn start(&mut self);
async fn stop(&mut self);
}
Run Code Online (Sandbox Code Playgroud)
共有三个结构体实现Feed:A、B和C。
当start被调用时,它会启动一个无限循环,监听来自 websocket 的消息并处理每条传入的消息。
我想实现一个FeedManager维护单个活动源但可以接收命令来切换它正在使用的源的命令。
enum FeedCommand {
Start(String),
Stop,
}
struct FeedManager {
active_feed_handle: tokio::task::JoinHandle,
controller: mpsc::Receiver<FeedCommand>,
}
impl FeedManager {
async fn start(&self) {
while let Some(command) = self.controller.recv().await {
match command {
FeedCommand::Start(feed_type) => {
// somehow tell the active feed to stop (need channel probably) or kill the task?
if feed_type == "A" {
// replace active feed task with a new tokio task for consuming feed A
} else if feed_type == "B" {
// replace active feed task with a new tokio task for consuming feed B
} else {
// replace active feed task with a new tokio task for consuming feed C
}
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我正在努力了解如何正确管理所有 Tokio 任务。 FeedManager的核心循环是永远监听传入的新命令,但它需要能够生成另一个长期存在的任务而不阻塞它(这样它就可以监听命令)。
我的第一次尝试是:
if feed_type == "A" {
self.active_feed_handle = tokio::spawn(async {
A::new().start().await;
});
self.active_feed_handle.await
}
Run Code Online (Sandbox Code Playgroud)
.await上的 会导致核心循环不再接受命令,对吧?.await并让任务继续运行吗?您可以通过生成一个任务 \xe2\x80\x94 来生成一个长时间运行的 Tokio 任务,而不会阻塞父任务,这是任务存在的主要原因。如果你不这样做.await任务,那么你将不会等待任务:
use std::time::Duration;\nuse tokio::{task, time}; // 1.3.0\n\n#[tokio::main]\nasync fn main() {\n task::spawn(async {\n time::sleep(Duration::from_secs(100)).await;\n eprintln!(\n "You\'ll likely never see this printed \\\n out because the parent task has exited \\\n and so has the entire program"\n );\n });\n}\nRun Code Online (Sandbox Code Playgroud)\n也可以看看:
\n