Ben*_*ijl 3 multithreading mutex ffi rust
我正在使用C库Cuba,该库使用从C中创建的多个线程调用的回调函数。Cuba并行化基于fork/ waitPOSIX函数而不是pthreads(arxiv.org/abs/1408.6373)。它在core参数中给出当前线程。
我正在尝试将此回调函数的结果记录到屏幕和文件中。如果我使用println!,则会得到预期的输出,但是如果slog使用Mutex漏极,则输出会被扭曲。如果使用async漏极,则根本没有输出。
Mutex因为无法看到该函数实际上是从另一个线程调用的,所以不锁定吗?我试图用Rust线程重新创建问题,但是不能。最好是我想花async些功夫。
下面是提供有问题的行为的示例程序。回调获取vegas函数的最后一个参数作为其参数之一。这是记录器克隆的向量。这样,每个内核都应拥有自己的记录器副本:
#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;
use slog::Drain;
// this function is called from different c threads
// `core` indicates which thread
fn integrand(
_x: &[f64],
_f: &mut [f64],
loggers: &mut Vec<slog::Logger>,
_nvec: usize,
core: i32,
) -> Result<(), &'static str> {
info!(loggers[core as usize], "A\nB\nC");
Ok(())
}
fn main() {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = std::sync::Mutex::new(drain).fuse();
let log = slog::Logger::root(drain, o!());
let mut integrator = cuba::CubaIntegrator::new(integrand);
integrator.set_cores(10, 1000); // set 10 cores
integrator.vegas(
1,
1,
cuba::CubaVerbosity::Progress,
0,
vec![log.clone(); 11],
);
}
Run Code Online (Sandbox Code Playgroud)
输出:
#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;
use slog::Drain;
// this function is called from different c threads
// `core` indicates which thread
fn integrand(
_x: &[f64],
_f: &mut [f64],
loggers: &mut Vec<slog::Logger>,
_nvec: usize,
core: i32,
) -> Result<(), &'static str> {
info!(loggers[core as usize], "A\nB\nC");
Ok(())
}
fn main() {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = std::sync::Mutex::new(drain).fuse();
let log = slog::Logger::root(drain, o!());
let mut integrator = cuba::CubaIntegrator::new(integrand);
integrator.set_cores(10, 1000); // set 10 cores
integrator.vegas(
1,
1,
cuba::CubaVerbosity::Progress,
0,
vec![log.clone(); 11],
);
}
Run Code Online (Sandbox Code Playgroud)
古巴C库说:
Windows用户:Cuba 3及更高版本用于
fork(2)并行化执行线程。但是,此POSIX函数不是Windows API的一部分,并且以必需的方式使用,因此无法简单地通过CreateProcessetc 来解决。唯一可行的仿真似乎可以通过Cygwin获得。
Here's a reproduction of the code. We fork and then the child and the parent attempt to hold the mutex while printing stuff out. A sleep is inserted to encourage the OS scheduler to try other threads:
use nix::unistd::{fork, ForkResult}; // 0.13.0
use std::{sync::Mutex, thread, time::Duration};
fn main() {
let shared = Mutex::new(10);
match fork() {
Ok(ForkResult::Parent { .. }) => {
let max = shared.lock().unwrap();
for _ in 0..*max {
println!("Parent");
thread::sleep(Duration::from_millis(10));
}
}
Ok(ForkResult::Child) => {
let max = shared.lock().unwrap();
for _ in 0..*max {
println!("Child");
thread::sleep(Duration::from_millis(10));
}
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
use nix::unistd::{fork, ForkResult}; // 0.13.0
use std::{sync::Mutex, thread, time::Duration};
fn main() {
let shared = Mutex::new(10);
match fork() {
Ok(ForkResult::Parent { .. }) => {
let max = shared.lock().unwrap();
for _ in 0..*max {
println!("Parent");
thread::sleep(Duration::from_millis(10));
}
}
Ok(ForkResult::Child) => {
let max = shared.lock().unwrap();
for _ in 0..*max {
println!("Child");
thread::sleep(Duration::from_millis(10));
}
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
Using fork with threads is really a pain to deal with; I distinctly remember hunting down terrible problems related to this before. Two resources I found that go in depth:
The latter says (emphasis mine):
Can I create mutex before fork-ing?
Yes - however the child and parent process will not share virtual memory and each one will have a mutex independent of the other.
(Advanced note: There are advanced options using shared memory that allow a child and parent to share a mutex if it's created with the correct options and uses a shared memory segment. See procs, fork(), and mutexes)
If I use the async drain I get no output at all.
See also:
I would not trust the Cuba Rust library. There are two main points:
If there are threads being created, the user data generic type should have either a Sync or Send bound on it, restricting to only types that are safe to share / transfer the data between threads.
The user data passed to the integrand function should not be a &mut. A fundamental Rust concept is that there can only be a single mutable reference to any piece of data at any time. Cuba trivially allows you to circumvent this.
Here's an attempted reproduction of the Cuba Rust and C libraries:
#[macro_use]
extern crate slog;
use slog::Drain;
fn integrand(loggers: &mut Vec<slog::Logger>, core: i32) {
info!(loggers[core as usize], "A\nB\nC\n{}", core);
}
fn main() {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = std::sync::Mutex::new(drain).fuse();
let log = slog::Logger::root(drain, o!());
let logs = vec![log.clone(); 11];
cuba_repro(logs, integrand);
}
use std::{ffi::c_void, thread};
type Integrand<T> = fn(&mut T, i32);
fn cuba_repro<T>(mut user_data: T, mut integrand: Integrand<T>) {
// From the `vegas` method
let user_data_ptr = &mut user_data as *mut _ as *mut c_void;
let integrand_ptr = &mut integrand as *mut _ as *mut c_void;
unsafe { cuba_repro_ffi::<T>(user_data_ptr, integrand_ptr) }
}
unsafe fn cuba_repro_ffi<T>(user_data: *const c_void, integrand: *const c_void) {
let user_data = FfiDoesNotCareAboutSendOrSync(user_data);
let integrand = FfiDoesNotCareAboutSendOrSync(integrand);
let threads: Vec<_> = (0..4).map(move |i| {
thread::spawn(move || {
// C doesn't care about this pedantry
let user_data = &mut *(user_data.0 as *const T as *mut T);
let integrand = &mut *(integrand.0 as *const Integrand<T> as *mut Integrand<T>);
// From the `c_integrand` function
let k: &mut T = &mut *(user_data as *mut _);
let _ignored = integrand(k, i);
})
}).collect();
for t in threads { t.join().unwrap() }
}
#[derive(Copy, Clone)]
struct FfiDoesNotCareAboutSendOrSync<T>(T);
unsafe impl<T> Send for FfiDoesNotCareAboutSendOrSync<T> {}
unsafe impl<T> Sync for FfiDoesNotCareAboutSendOrSync<T> {}
Run Code Online (Sandbox Code Playgroud)
I had to make numerous changes to get the Rust compiler to ignore the huge amount of unsafety and rule-breaking that the Cuba library and related FFI is performing.
This code example does actually print out 4 log statements each in order, so this is not a complete answer. However, I'm fairly certain that the Cuba library is triggering undefined behavior, which means that any outcome is possible, including apparently working.