Dav*_*ner 7 concurrency rust async-await rust-tokio
我正在尝试使用 MPSC 构建多线程应用程序,但遇到了标题中的错误。我不确定这个用例的正确模式是什么 - 我正在寻找一种模式,它允许我克隆生产者通道并将其移动到要使用的新线程中。
这个新线程将保持一个打开的 websocket,并在收到 websocket 消息时通过生产者发送 websocket 消息数据的子集。消费者线程中将需要来自其他线程的数据,这就是为什么我认为 MPSC 模式是一个合适的选择。
除了标题中的消息之外,它还显示以下内容:
`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`
Run Code Online (Sandbox Code Playgroud)
我可以/应该Send为此实施吗?Rc这是使用or的合适时机吗Pin?我相信这种情况正在发生,因为我试图发送一个未在闭包Send中实现的类型,但我不知道如何利用它或在这种情况下要达到什么目的。.awaitasync
我已经能够将我的问题简化为:
use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};
#[tokio::main]
async fn main() {
let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();
tokio::spawn(async move {
let a = [1, 2, 3];
let mut s = stream::iter(a.iter())
.cycle()
.for_each(move |int| async {
tx.send(*int);
})
.await;
});
}
Run Code Online (Sandbox Code Playgroud)
您的代码有几个问题。move第一个是最里面的块中缺少 a async,因此编译器尝试借用对tx. Sender这就是为什么您会收到(type of tx) does not Implement 的错误Sync。
添加缺失的内容后,move您会收到不同的错误:
error[E0507]: cannot move out of `tx`, a captured variable in an `FnMut` closure
Run Code Online (Sandbox Code Playgroud)
现在的问题是,for_each()将多次调用闭包,因此实际上不允许您移动到异步块中 - 因为在第一次调用闭包后将tx没有任何内容可以移动。
由于 MPSC 通道允许多个生产者、Sender实现Clone,因此您可以tx在将其移动到异步块之前简单地进行克隆。这编译:
let (tx, _rx): (Sender<i32>, Receiver<i32>) = channel();
tokio::spawn(async move {
let a = [1, 2, 3];
let _s = stream::iter(a.iter())
.cycle()
.for_each(move |int| {
let tx = tx.clone();
async move {
tx.send(*int).unwrap();
}
})
.await;
});
Run Code Online (Sandbox Code Playgroud)
最后,正如评论中指出的,您几乎肯定想在此处使用异步通道。虽然您创建的通道是无界的,所以发送者永远不会阻塞,但接收者在没有消息时会阻塞,从而停止整个执行器线程。
碰巧,tokio MPSC 通道的发送方也实现了Sync,允许与您问题中的代码接近的代码进行编译:
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
let a = [1, 2, 3];
let _s = stream::iter(a.iter())
.cycle()
.for_each(|int| async {
tx.send(*int).unwrap();
})
.await;
});
assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
assert_eq!(rx.recv().await, Some(3));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3190 次 |
| 最近记录: |