线程中使用的所有内容都必须在 Rust 中“可发送”吗?

4nt*_*ine 3 multithreading rust

我正在尝试在 Rust 中实现并发处理。这是(简化的)代码(playground):

struct Checker {
    unsendable: std::rc::Rc<String> // `Rc` does not implement Send + Sync (`Rc` just for simplicity, a graph actually)
}

impl Checker {
    pub fn check(&self, input: String) -> bool {
        // some logics (no matter)
        true
    }
}

struct ParallelChecker {
    allow_checker: Checker,
    block_checker: Checker
}

impl ParallelChecker {
    // `String` for simplicity here
    pub fn check(&self, input: String) -> bool {
        let thread_pool = rayon::ThreadPoolBuilder::new()
            .num_threads(2)
            .build()
            .unwrap();

        // scoped thread pool for simplicity (to avoid `self` is not `'static`)
        thread_pool.scope(move |s| {
            s.spawn(|_| {
                let result = self.allow_checker.check(input.clone());
                // + send result via channel
            });
        });

        thread_pool.scope(move |s| {
            s.spawn(|_| {
                let result = self.block_checker.check(input.clone());
                // + send result via channel
            });
        });

        true // for simplicity
        // + receive result form the channels
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是,self.allow_checkerself.block_checker没有实施Send

55 |         thread_pool.scope(move |s| {
   |                     ^^^^^ `Rc<String>` cannot be sent between threads safely
   |
   = help: within `Checker`, the trait `Send` is not implemented for `Rc<String>`
   = note: required because it appears within the type `Checker`
   = note: required because of the requirements on the impl of `Send` for `Mutex<Checker>`
   = note: 1 redundant requirements hidden
   = note: required because of the requirements on the impl of `Send` for `Arc<Mutex<Checker>>`
   = note: required because it appears within the type `[closure@src/parallel_checker.rs:55:27: 60:10]`
Run Code Online (Sandbox Code Playgroud)

我的印象是只有通过渠道发送的东西才能实现Send + Sync,我在这里可能错了。

如您所见,线程代码没有任何共享变量(除了self)。如何在不实施Send和不支付同步费用的情况下使其工作?

我试图同步访问(虽然它看起来没用,因为线程中没有共享变量),但没有运气:

    let allow_checker = Arc::new(Mutex::new(self.allow_checker));
        thread_pool.scope(move |s| {
            s.spawn(|_| {
                let result = allow_checker.lock().unwrap().check(input.clone());
                // + send result via channel
            });
        });
Run Code Online (Sandbox Code Playgroud)

附注。Arc由于性能问题以及要迁移到的大量相关代码,迁移到是非常不受欢迎的Arc

Mat*_* M. 11

我的印象是只有通过渠道发送的东西才能实现Send + Sync,我在这里可能错了。

有点错:

  • 类型是指Send它的值是否可以跨线程发送。例如Send,许多类型是,String是。
  • 类型是Sync指可以从多个线程访问对其值的引用,而不会引起任何数据竞争。也许令人惊讶的是,这意味着它StringSync——由于共享时是不可变的——并且通常T适用Sync于任何&TSend

请注意,这些规则不关心值如何跨线程发送或共享,只关心它们。

这在这里很重要,因为用于启动线程的闭包本身是跨线程发送的:它在“启动器”线程上创建,并在“启动”线程上执行。

因此,这个闭包必须是Send,这意味着它捕获的任何东西都必须是Send

为什么不Rc执行Send

因为它的引用计数是非原子的。

也就是说,如果您有:

  • 线程 A:一个Rc
  • 线程 B:一个Rc(同一个指针)。

然后线程 A 在线程 B 创建克隆的同时删除其句柄,您希望计数为 2(仍然),但由于非原子访问,尽管仍然存在 2 个句柄,但它可能仅为 1:

  • 线程 A 读取计数 (2)。
  • 线程 B 读取计数 (2)。
  • 线程 B 写入递增计数 (3)。
  • 线程 A 写入递减计数 (1)。

然后,下次 B 放下手柄时,该项目将被破坏并释放内存,并且通过最后一个手柄进行的任何进一步访问都会在您的脸上爆炸。

我试图同步访问(虽然它看起来没用,因为线程中没有共享变量),但没有运气:

你不能包装一个类型来使它成为Send,它没有帮助,因为它不会改变基本属性。

在上述竞争条件Rc可能发生,即使有一个Rc裹着Arc<Mutex<...>>

因此!Send具有传染性并“感染”任何包含类型。

Arc由于性能问题以及要迁移到的大量相关代码,迁移到是非常不受欢迎的Arc

Arc本身的性能开销相对较小,因此除非您继续克隆那些Checker,否则这似乎不太重要,您可能会改进 - 传递引用,而不是克隆。

如果不是,则此处较高的开销将来自Mutex(或RwLock)。正如我所提到的,任何不可变的值都是微不足道的,所以如果你可以重构to的内部状态,那么你就可以完全避免 ,而只是包含。CheckerSyncSyncCheckerSyncMutexCheckerArc<State>

如果您目前有可变状态,请考虑提取它,朝着:

struct Checker {
    unsendable: Arc<Immutable>,
}

impl Checker {
    pub fn check(&self, input: String) -> Mutable {
        unimplemented!()
    }
}
Run Code Online (Sandbox Code Playgroud)

或者放弃并行检查的想法。