如何将函数发送到另一个线程?

Nik*_*sen 5 multithreading unit-testing rust

我正在尝试为Rust项目编写一个更简单的单元测试运行器.我创建了一个TestFixture特性,我的测试夹具结构将实现,类似于继承其他测试框架中的单元测试基类.特点很简单.这是我的测试夹具

pub trait TestFixture {
    fn setup(&mut self) -> () {}
    fn teardown(&mut self) -> () {}
    fn before_each(&mut self) -> () {}
    fn after_each(&mut self) -> () {}
    fn tests(&mut self) -> Vec<Box<Fn(&mut Self)>>
        where Self: Sized {
        Vec::new()
    }
}
Run Code Online (Sandbox Code Playgroud)

我的测试运行功能如下

pub fn test_fixture_runner<T: TestFixture>(fixture: &mut T) {
    fixture.setup();

    let _r = fixture.tests().iter().map(|t| {
        let handle = thread::spawn(move || {
            fixture.before_each();
            t(fixture);
            fixture.after_each();
        });

        if let Err(_) = handle.join() {
            println!("Test failed!")
        } 
    });

    fixture.teardown();
}
Run Code Online (Sandbox Code Playgroud)

我收到了错误

src/tests.rs:73:22: 73:35 error: the trait `core::marker::Send` is not implemented for the type `T` [E0277]
src/tests.rs:73         let handle = thread::spawn(move || {
                                     ^~~~~~~~~~~~~
note: in expansion of closure expansion
src/tests.rs:69:41: 84:6 note: expansion site
src/tests.rs:73:22: 73:35 note: `T` cannot be sent between threads safely
src/tests.rs:73         let handle = thread::spawn(move || {
                                     ^~~~~~~~~~~~~
note: in expansion of closure expansion
src/tests.rs:69:41: 84:6 note: expansion site
src/tests.rs:73:22: 73:35 error: the trait `core::marker::Sync` is not implemented for the type `for<'r> core::ops::Fn(&'r mut T)` [E0277]
src/tests.rs:73         let handle = thread::spawn(move || {
                                     ^~~~~~~~~~~~~
note: in expansion of closure expansion
src/tests.rs:69:41: 84:6 note: expansion site
src/tests.rs:73:22: 73:35 note: `for<'r> core::ops::Fn(&'r mut T)` cannot be shared between threads safely
src/tests.rs:73         let handle = thread::spawn(move || {
                                     ^~~~~~~~~~~~~
note: in expansion of closure expansion
Run Code Online (Sandbox Code Playgroud)

我尝试在发送到线程的类型周围添加Arcs,没有骰子,同样的错误.

pub fn test_fixture_runner<T: TestFixture>(fixture: &mut T) {
    fixture.setup();

    let fix_arc = Arc::new(Mutex::new(fixture));
    let _r = fixture.tests().iter().map(|t| {
        let test_arc = Arc::new(Mutex::new(t));
        let fix_arc_clone = fix_arc.clone();
        let test_arc_clone = test_arc.clone();
        let handle = thread::spawn(move || {
            let thread_test = test_arc_clone.lock().unwrap();
            let thread_fix = fix_arc_clone.lock().unwrap();
            (*thread_fix).before_each();
            (*thread_test)(*thread_fix);
            (*thread_fix).after_each();
        });

        if let Err(_) = handle.join() {
            println!("Test failed!")
        } 
    });

    fixture.teardown();
}
Run Code Online (Sandbox Code Playgroud)

样品测试夹具就像是

struct BuiltinTests {
    pwd: PathBuf
}

impl TestFixture for BuiltinTests {
    fn setup(&mut self) {
        let mut pwd = env::temp_dir();
        pwd.push("pwd");

        fs::create_dir(&pwd);
        self.pwd = pwd;
    }

    fn teardown(&mut self) {
        fs::remove_dir(&self.pwd);
    }

    fn tests(&mut self) -> Vec<Box<Fn(&mut BuiltinTests)>> {
        vec![Box::new(BuiltinTests::cd_with_no_args)]
    }
}

impl BuiltinTests {
    fn new() -> BuiltinTests {
        BuiltinTests {
            pwd: PathBuf::new()
        }
    }
}

fn cd_with_no_args(&mut self) {
    let home = String::from("/");
    env::set_var("HOME", &home);

    let mut cd = Cd::new();
    cd.run(&[]);

    assert_eq!(env::var("PWD"), Ok(home));
}

#[test]
fn cd_tests() {
    let mut builtin_tests = BuiltinTests::new();
    test_fixture_runner(&mut builtin_tests);
}
Run Code Online (Sandbox Code Playgroud)

我使用线程的全部意图是与测试运行器隔离.如果测试失败,则会引起恐慌,导致跑步者死亡.感谢您的任何见解,我愿意改变我的设计,如果这将解决恐慌问题.

Vla*_*eev 7

您的代码有几个问题,我将向您展示如何逐一修复它们.

第一个问题是您正在使用map()迭代迭代器.它将无法正常工作,因为它map()是惰性的 - 除非您使用迭代器,否则传递给它的闭包将无法运行.正确的方法是使用for循环:

for t in fixture().tests().iter() {
Run Code Online (Sandbox Code Playgroud)

其次,您通过引用迭代闭包向量:

fixture.tests().iter().map(|t| {
Run Code Online (Sandbox Code Playgroud)

iter()在a上Vec<T>返回一个迭代器,产生类型的项&T,所以你t的类型&Box<Fn(&mut Self)>.但是,默认情况下Box<Fn(&mut T)>不实现Sync(它是一个特征对象,除了您明确指定的之外没有关于底层类型的信息),因此&Box<Fn(&mut T)>不能跨多个线程使用.这就是你看到的第二个错误.

很可能你不想通过引用使用这些闭包; 你可能想完全将它们移动到衍生线程.为此,您需要使用into_iter()而不是iter():

for t in fixture.tests().into_iter() {
Run Code Online (Sandbox Code Playgroud)

现在t将是类型Box<Fn(&mut T)>.但是,它仍然无法跨线程发送.同样,它是一个特征对象,编译器不知道内部包含的类型是什么Send.为此,您需要将Send绑定添加到闭包的类型:

fn tests(&mut self) -> Vec<Box<Fn(&mut Self)+Send>>
Run Code Online (Sandbox Code Playgroud)

现在错误Fn消失了.

最后一个错误是关于Send没有实现的T.我们需要添加一个Send绑定T:

pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) {
Run Code Online (Sandbox Code Playgroud)

现在,错误变得更容易理解:

test.rs:18:22: 18:35 error: captured variable `fixture` does not outlive the enclosing closure
test.rs:18         let handle = thread::spawn(move || {
                                ^~~~~~~~~~~~~
note: in expansion of closure expansion
test.rs:18:5: 28:6 note: expansion site
test.rs:15:66: 31:2 note: captured variable is valid for the anonymous lifetime #1 defined on the block at 15:65
test.rs:15 pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) {
test.rs:16     fixture.setup();
test.rs:17
test.rs:18     for t in fixture.tests().into_iter() {
test.rs:19         let handle = thread::spawn(move || {
test.rs:20             fixture.before_each();
           ...
note: closure is valid for the static lifetime
Run Code Online (Sandbox Code Playgroud)

发生此错误的原因是您尝试在spawn()ed线程中使用引用.spawn()要求其闭包参数具有'static绑定,即,其捕获的环境不得包含非'static生命周期的引用.但这正是这里发生的事情 - &mut T不是'static.spawn()设计并不禁止避免加入,因此明确写入以禁止传递'static对生成线程的非引用.

请注意,当你使用时&mut T,这个错误是不可避免的,即使你&mut T输入Arc,因为那时的生命周期&mut T将被"存储" Arc,所以Arc<Mutex<&mut T>>也不会'static.

有两种方法可以做你想要的.

首先,您可以使用不稳定的thread::scoped()API.它是不稳定的,因为它表明安全代码中的内存不安全,并且计划将来为它提供某种替代.但是,您可以在夜间使用Rust(它不会导致内存不安全,仅在特殊情况下):

pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) {
    fixture.setup();

    let tests = fixture.lock().unwrap().tests();
    for t in tests.into_iter() {
        let f = &mut *fixture;

        let handle = thread::scoped(move || {
            f.before_each();
            t(f);
            f.after_each();
        });

        handle.join();
    }

    fixture.teardown();
}
Run Code Online (Sandbox Code Playgroud)

这段代码编译是因为scoped()它以这样的方式编写,它保证(在大多数情况下)线程不会超过所有捕获的引用.我不得不重新借用,fixture因为否则(因为&mut引用不可复制)它将被移入线程并被fixture.teardown()禁止.此外,我不得不提取tests变量,因为否则互斥锁将在for循环的持续时间内被主线程锁定,这自然会禁止将其锁定在子线程中.

但是,scoped()你无法隔离子线程中的恐慌.如果子线程发生混乱,这种恐慌将从join()调用中重新引发.这通常可能是一个问题,也可能不是,但我认为这对您的代码来说一个问题.

另一种方法是重构代码以Arc<Mutex<..>>从头开始保持夹具:

pub fn test_fixture_runner<T: TestFixture + Send + 'static>(fixture: Arc<Mutex<T>>) {
    fixture.lock().unwrap().setup();

    for t in fixture.lock().unwrap().tests().into_iter() {
        let fixture = fixture.clone();

        let handle = thread::spawn(move || {
            let mut fixture = fixture.lock().unwrap();

            fixture.before_each();
            t(&mut *fixture);
            fixture.after_each();
        });

        if let Err(_) = handle.join() {
            println!("Test failed!")
        } 
    }

    fixture.lock().unwrap().teardown();
}
Run Code Online (Sandbox Code Playgroud)

需要注意的是,现在T也已经是'static再次,因为否则就不能与使用thread::spawn(),因为它需要'static.fixture内部闭包内部不是&mut Ta MutexGuard<T>,因此必须将其显式转换&mut T为传递给它t.

这可能看起来过于庞大且不必要地复杂,但是,这种编程语言的设计确实可以防止您在多线程编程中产生许多错误.我们看到的上述每一个错误都是有效的 - 如果被忽略,它们中的每一个都会成为记忆不安全或数据竞争的潜在原因.