当使用`fork`创建的多个C线程的回调函数时,Rust Mutex不起作用

Ben*_*ijl 3 multithreading mutex ffi rust

我正在使用C库Cuba,该库使用从C中创建的多个线程调用的回调函数。Cuba并行化基于fork/ waitPOSIX函数而不是pthreads(arxiv.org/abs/1408.6373)。它在core参数中给出当前线程。

我正在尝试将此回调函数的结果记录到屏幕和文件中。如果我使用println!,则会得到预期的输出,但是如果slog使用Mutex漏极,则输出会被扭曲。如果使用async漏极,则根本没有输出。

Mutex因为无法看到该函数实际上是从另一个线程调用的,所以不锁定吗?我试图用Rust线程重新创建问题,但是不能。最好是我想花async些功夫。

下面是提供有问题的行为的示例程序。回调获取vegas函数的最后一个参数作为其参数之一。这是记录器克隆的向量。这样,每个内核都应拥有自己的记录器副本:

#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;

use slog::Drain;

// this function is called from different c threads
// `core` indicates which thread
fn integrand(
    _x: &[f64],
    _f: &mut [f64],
    loggers: &mut Vec<slog::Logger>,
    _nvec: usize,
    core: i32,
) -> Result<(), &'static str> {
    info!(loggers[core as usize], "A\nB\nC");

    Ok(())
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let mut integrator = cuba::CubaIntegrator::new(integrand);
    integrator.set_cores(10, 1000); // set 10 cores

    integrator.vegas(
        1,
        1,
        cuba::CubaVerbosity::Progress,
        0,
        vec![log.clone(); 11],
    );
}
Run Code Online (Sandbox Code Playgroud)

输出:

#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;

use slog::Drain;

// this function is called from different c threads
// `core` indicates which thread
fn integrand(
    _x: &[f64],
    _f: &mut [f64],
    loggers: &mut Vec<slog::Logger>,
    _nvec: usize,
    core: i32,
) -> Result<(), &'static str> {
    info!(loggers[core as usize], "A\nB\nC");

    Ok(())
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let mut integrator = cuba::CubaIntegrator::new(integrand);
    integrator.set_cores(10, 1000); // set 10 cores

    integrator.vegas(
        1,
        1,
        cuba::CubaVerbosity::Progress,
        0,
        vec![log.clone(); 11],
    );
}
Run Code Online (Sandbox Code Playgroud)

She*_*ter 8

古巴C库

Windows用户:Cuba 3及更高版本用于fork(2)并行化执行线程。但是,此POSIX函数不是Windows API的一部分,并且以必需的方式使用,因此无法简单地通过CreateProcessetc 来解决。唯一可行的仿真似乎可以通过Cygwin获得。

Here's a reproduction of the code. We fork and then the child and the parent attempt to hold the mutex while printing stuff out. A sleep is inserted to encourage the OS scheduler to try other threads:

use nix::unistd::{fork, ForkResult}; // 0.13.0
use std::{sync::Mutex, thread, time::Duration};

fn main() {
    let shared = Mutex::new(10);

    match fork() {
        Ok(ForkResult::Parent { .. }) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Parent");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Ok(ForkResult::Child) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Child");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)
use nix::unistd::{fork, ForkResult}; // 0.13.0
use std::{sync::Mutex, thread, time::Duration};

fn main() {
    let shared = Mutex::new(10);

    match fork() {
        Ok(ForkResult::Parent { .. }) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Parent");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Ok(ForkResult::Child) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Child");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Using fork with threads is really a pain to deal with; I distinctly remember hunting down terrible problems related to this before. Two resources I found that go in depth:

The latter says (emphasis mine):

Can I create mutex before fork-ing?

Yes - however the child and parent process will not share virtual memory and each one will have a mutex independent of the other.

(Advanced note: There are advanced options using shared memory that allow a child and parent to share a mutex if it's created with the correct options and uses a shared memory segment. See procs, fork(), and mutexes)


If I use the async drain I get no output at all.

See also:


I would not trust the Cuba Rust library. There are two main points:

  1. If there are threads being created, the user data generic type should have either a Sync or Send bound on it, restricting to only types that are safe to share / transfer the data between threads.

  2. The user data passed to the integrand function should not be a &mut. A fundamental Rust concept is that there can only be a single mutable reference to any piece of data at any time. Cuba trivially allows you to circumvent this.

Here's an attempted reproduction of the Cuba Rust and C libraries:

#[macro_use]
extern crate slog;

use slog::Drain;

fn integrand(loggers: &mut Vec<slog::Logger>, core: i32) {
    info!(loggers[core as usize], "A\nB\nC\n{}", core);
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let logs = vec![log.clone(); 11];

    cuba_repro(logs, integrand);
}

use std::{ffi::c_void, thread};

type Integrand<T> = fn(&mut T, i32);

fn cuba_repro<T>(mut user_data: T, mut integrand: Integrand<T>) {
    // From the `vegas` method
    let user_data_ptr = &mut user_data as *mut _ as *mut c_void;
    let integrand_ptr = &mut integrand as *mut _ as *mut c_void;

    unsafe { cuba_repro_ffi::<T>(user_data_ptr, integrand_ptr) }
}

unsafe fn cuba_repro_ffi<T>(user_data: *const c_void, integrand: *const c_void) {
    let user_data = FfiDoesNotCareAboutSendOrSync(user_data);
    let integrand = FfiDoesNotCareAboutSendOrSync(integrand);

    let threads: Vec<_> = (0..4).map(move |i| {
        thread::spawn(move || {
            // C doesn't care about this pedantry
            let user_data = &mut *(user_data.0 as *const T as *mut T);
            let integrand = &mut *(integrand.0 as *const Integrand<T> as *mut Integrand<T>);

            // From the `c_integrand` function
            let k: &mut T = &mut *(user_data as *mut _);
            let _ignored = integrand(k, i);
        })
    }).collect();

    for t in threads { t.join().unwrap() }
}

#[derive(Copy, Clone)]
struct FfiDoesNotCareAboutSendOrSync<T>(T);
unsafe impl<T> Send for FfiDoesNotCareAboutSendOrSync<T> {}
unsafe impl<T> Sync for FfiDoesNotCareAboutSendOrSync<T> {}
Run Code Online (Sandbox Code Playgroud)

I had to make numerous changes to get the Rust compiler to ignore the huge amount of unsafety and rule-breaking that the Cuba library and related FFI is performing.

This code example does actually print out 4 log statements each in order, so this is not a complete answer. However, I'm fairly certain that the Cuba library is triggering undefined behavior, which means that any outcome is possible, including apparently working.

  • @BenRuijl *我没有找到一种更好的方式来获得某种形式的内存安全性*-如果将内存不安全性暴露给用户,则需要将功能标记为不安全,并说明用户打算如何避免这种不安全性。使代码在不安全的地方显得安全,这对所有人都不利。 (2认同)