多线程可以使用Rust闭包吗?

Jos*_*sen 5 concurrency multithreading closures rust

我希望能够让多个线程评估相同的闭包.我想到的应用程序是并行化数值集成,因此可以将函数域轻松拆分为N个块并交给线程.

这是一个简单的函数,它多次计算提供的闭包并平均结果:

use std::sync::mpsc;
use std::thread;

const THREAD_COUNT: u64 = 4;

fn average<F: Fn(f64) -> f64>(f: F) -> f64 {
    let (tx, rx) = mpsc::channel();
    for id in 0..THREAD_COUNT {
        let thread_tx = tx.clone();
        thread::spawn(move || {
            thread_tx.send(f(id as f64));
        });
    }

    let mut total = 0.0;
    for id in 0..THREAD_COUNT {
        total += rx.recv().unwrap();
    }
    total / THREAD_COUNT as f64
}

fn main() {
    average(|x: f64| -> f64 { x });
}
Run Code Online (Sandbox Code Playgroud)

当我编译时,我收到此错误:

error[E0277]: `F` cannot be sent between threads safely
  --> src/main.rs:10:9
   |
10 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^ `F` cannot be sent between threads safely
   |
   = help: within `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]`, the trait `std::marker::Send` is not implemented for `F`
   = help: consider adding a `where F: std::marker::Send` bound
   = note: required because it appears within the type `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]`
   = note: required by `std::thread::spawn`
Run Code Online (Sandbox Code Playgroud)

所以我添加+ Send到边界F并得到一个新的错误:

error[E0310]: the parameter type `F` may not live long enough
  --> src/main.rs:10:9
   |
6  | fn average<F: Fn(f64) -> f64 + Send>(f: F) -> f64 {
   |            -- help: consider adding an explicit lifetime bound `F: 'static`...
...
10 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^
   |
note: ...so that the type `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]` will meet its required lifetime bounds
  --> src/main.rs:10:9
   |
10 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^
Run Code Online (Sandbox Code Playgroud)

所以,我想补充+ 'staticF,并得到这样的:

error[E0382]: capture of moved value: `f`
  --> src/main.rs:11:28
   |
10 |         thread::spawn(move || {
   |                       ------- value moved (into closure) here
11 |             thread_tx.send(f(id as f64));
   |                            ^ value captured here after move
   |
   = note: move occurs because `f` has type `F`, which does not implement the `Copy` trait
Run Code Online (Sandbox Code Playgroud)

所以,我想补充+ CopyF,并得到:

error: the trait `core::marker::Copy` is not implemented for the type `[closure@src/test.rs:115:11: 115:26]
Run Code Online (Sandbox Code Playgroud)

似乎每个线程都需要它自己的闭包副本(因为move)但是闭包没有实现Copy所以没有运气.这对我来说似乎很奇怪,因为如果闭包从不变态,那么多线程访问它们的安全问题是什么?

我可以通过提供常规函数而不是闭包来使代码工作,但这使得我的代码非泛型,即它只适用于特定函数而不是任何函数Fn(f64) -> f64.对于我正在进行的集成类型,集成的函数通常将某些固定变量与集成变量混合在一起,因此使用闭包捕获固定变量似乎很自然.

有没有办法以通用方式进行这种多线程功能评估?我只是想错了吗?

She*_*ter 7

最终的问题围绕谁拥有关闭.所写的代码表明封闭的所有权转移到average.然后,此函数尝试将闭包提供给多个线程,如您所见,这会失败,因为您无法将一个项目提供给多个子项.

但是封闭没有实现Copy所以没有运气

Rust 1.26.0开始,闭包确实实现Clone,Copy如果所有捕获的变量都这样做.这意味着您的最终示例代码现在可以正常工作:

fn average<F: Fn(f64) -> f64 + Send + 'static + Copy>(f: F) -> f64 { /* ... */ }
Run Code Online (Sandbox Code Playgroud)

但是,您的闭包可能无法实现CopyClone.

你不能给出对拥有的闭包的引用,average因为创建的线程thread::spawn可能比调用更长average.当average退出,任何栈上分配的变量将被销毁.任何使用它们都会导致记忆不安全,Rust会阻止这种记忆.

一种解决方案是使用Arc.这将允许多线程上下文中的单个资源的多个共享所有者.克隆包装闭包时,只创建一个新引用.当所有引用消失时,将释放该对象.

use std::{
    sync::{mpsc, Arc},
    thread,
};

const THREAD_COUNT: u64 = 4;

fn average<F>(f: F) -> f64
where
    F: Fn(f64) -> f64 + Send + Sync + 'static,
{
    let (tx, rx) = mpsc::channel();
    let f = Arc::new(f);

    for id in 0..THREAD_COUNT {
        let thread_tx = tx.clone();
        let f = f.clone();
        thread::spawn(move || {
            thread_tx.send(f(id as f64)).unwrap();
        });
    }

    let mut total = 0.0;
    for _ in 0..THREAD_COUNT {
        total += rx.recv().unwrap();
    }

    total / THREAD_COUNT as f64
}

fn main() {
    average(|x| x);
}
Run Code Online (Sandbox Code Playgroud)

更标准的解决方案是使用范围线程.保证这些线程会在一定时间内退出,这允许您将超出线程的引用传递给线程.

也可以看看: