`std::sync::mpsc::Sender<i32> 未实现特征 `std::marker::Sync`

Dav*_*ner 7 concurrency rust async-await rust-tokio

我正在尝试使用 MPSC 构建多线程应用程序,但遇到了标题中的错误。我不确定这个用例的正确模式是什么 - 我正在寻找一种模式,它允许我克隆生产者通道并将其移动到要使用的新线程中。

这个新线程将保持一个打开的 websocket,并在收到 websocket 消息时通过生产者发送 websocket 消息数据的子集。消费者线程中将需要来自其他线程的数据,这就是为什么我认为 MPSC 模式是一个合适的选择。

除了标题中的消息之外,它还显示以下内容:

`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`
Run Code Online (Sandbox Code Playgroud)

我可以/应该Send为此实施吗?Rc这是使用or的合适时机吗Pin?我相信这种情况正在发生,因为我试图发送一个未在闭包Send中实现的类型,但我不知道如何利用它或在这种情况下要达到什么目的。.awaitasync

我已经能够将我的问题简化为:

use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};

#[tokio::main]
async fn main() {
    let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();

    tokio::spawn(async move {
        let a = [1, 2, 3];
        let mut s = stream::iter(a.iter())
            .cycle()
            .for_each(move |int| async {
                tx.send(*int);
            })
            .await;
    });
}
Run Code Online (Sandbox Code Playgroud)

use*_*342 5

您的代码有几个问题。move第一个是最里面的块中缺少 a async,因此编译器尝试借用对tx. Sender这就是为什么您会收到(type of tx) does not Implement 的错误Sync

添加缺失的内容后,move您会收到不同的错误:

error[E0507]: cannot move out of `tx`, a captured variable in an `FnMut` closure
Run Code Online (Sandbox Code Playgroud)

现在的问题是,for_each()将多次调用闭包,因此实际上不允许您移动到异步块中 - 因为在第一次调用闭包后将tx没有任何内容可以移动。

由于 MPSC 通道允许多个生产者、Sender实现Clone,因此您可以tx在将其移动到异步块之前简单地进行克隆。这编译:

let (tx, _rx): (Sender<i32>, Receiver<i32>) = channel();

tokio::spawn(async move {
    let a = [1, 2, 3];
    let _s = stream::iter(a.iter())
        .cycle()
        .for_each(move |int| {
            let tx = tx.clone();
            async move {
                tx.send(*int).unwrap();
            }
        })
        .await;
});
Run Code Online (Sandbox Code Playgroud)

操场

最后,正如评论中指出的,您几乎肯定想在此处使用异步通道。虽然您创建的通道是无界的,所以发送者永远不会阻塞,但接收者在没有消息时阻塞,从而停止整个执行器线程。

碰巧,tokio MPSC 通道的发送方也实现了Sync,允许与您问题中的代码接近的代码进行编译:

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

tokio::spawn(async move {
    let a = [1, 2, 3];
    let _s = stream::iter(a.iter())
        .cycle()
        .for_each(|int| async {
            tx.send(*int).unwrap();
        })
        .await;
});

assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
assert_eq!(rx.recv().await, Some(3));
Run Code Online (Sandbox Code Playgroud)

操场

  • 这个回答不仅正确回答了我的问题,而且还回答了我的问题。它提供了足够的上下文来帮助我理解 Rust 中异步编程的一些我无法从文档中掌握的要点。关于即使对于无界通道,同步接收器如何阻塞的部分对于理解同步和异步模式如何关联特别有帮助。谢谢; 我感谢您花时间以简单而彻底的方式解释这一点。 (6认同)