Adr*_*lle 1 concurrency multithreading mutex condition-variable rust
我正在尝试使用单个消费者和单个生产者实现缓冲区。我只使用了 POSIX 信号量,但是,它们在 Rust 中不可用,我正在尝试使用 Rust 同步原语 ( Mutex, Condvar, Barrier, ...)实现一个微不足道的信号量问题,但我不想使用通道。
我的代码表现得太不规则了,在某些情况下运行良好,有时它只是在某个数字处停止,而在其他情况下它只是不开始计数。
如果我在主线程中等待 1 秒直到我发送Condvar通知,事情似乎会更好,但它并不能保证它不会进入死锁。
如何修复这个程序?我理解Condvar错了吗?
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
struct Buffer {
is_data: Mutex<bool>,
is_data_cv: Condvar,
is_space: Mutex<bool>,
is_space_cv: Condvar,
buffer: Mutex<i32>,
}
fn producer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_space = buffer
.is_space_cv
.wait(buffer.is_space.lock().unwrap())
.unwrap();
if *is_space {
{
let mut hueco = buffer.buffer.lock().unwrap();
*hueco = i;
}
*is_space = false;
{
let mut is_data = buffer.is_data.lock().unwrap();
*is_data = true;
}
buffer.is_data_cv.notify_one();
break;
}
}
}
}
fn consumer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_data = buffer
.is_data_cv
.wait(buffer.is_data.lock().unwrap())
.unwrap();
if *is_data {
{
let hueco = buffer.buffer.lock().unwrap();
println!("{}", *hueco);
}
*is_data = false;
{
let mut is_space = buffer.is_space.lock().unwrap();
*is_space = true;
}
buffer.is_space_cv.notify_one();
break;
}
}
}
}
fn main() {
let buffer = Arc::new(Buffer {
is_data: Mutex::new(false),
is_data_cv: Condvar::new(),
is_space: Mutex::new(true),
is_space_cv: Condvar::new(),
buffer: Mutex::new(0),
});
let b = buffer.clone();
let p = thread::spawn(move || {
producer(b);
});
let b = buffer.clone();
let c = thread::spawn(move || {
consumer(b);
});
//thread::sleep_ms(1000);
buffer.is_space_cv.notify_one();
c.join();
}
Run Code Online (Sandbox Code Playgroud)
我鼓励您创建更小的方法并重用现有的 Rust 类型,例如Option. 这将允许您大大简化您的代码 - 只有一个Mutex和一个Condvar:
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
#[derive(Debug, Default)]
struct Buffer {
data: Mutex<Option<i32>>,
data_cv: Condvar,
}
impl Buffer {
fn insert(&self, val: i32) {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_some() {
lock = self.data_cv.wait(lock).expect("Can't wait");
}
*lock = Some(val);
self.data_cv.notify_one();
}
fn remove(&self) -> i32 {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_none() {
lock = self.data_cv.wait(lock).expect("Can't wait");
}
let val = lock.take().unwrap();
self.data_cv.notify_one();
val
}
}
fn producer(buffer: &Buffer) {
for i in 0..50 {
println!("p: {}", i);
buffer.insert(i);
}
}
fn consumer(buffer: &Buffer) {
for _ in 0..50 {
let val = buffer.remove();
println!("c: {}", val);
}
}
fn main() {
let buffer = Arc::new(Buffer::default());
let b = buffer.clone();
let p = thread::spawn(move || {
producer(&b);
});
let b = buffer.clone();
let c = thread::spawn(move || {
consumer(&b);
});
c.join().expect("Consumer had an error");
p.join().expect("Producer had an error");
}
Run Code Online (Sandbox Code Playgroud)
如果您想获得更高的性能(基准测试是否值得),您可以分别Condvar为“空”和“满”条件设置s:
#[derive(Debug, Default)]
struct Buffer {
data: Mutex<Option<i32>>,
is_empty: Condvar,
is_full: Condvar,
}
impl Buffer {
fn insert(&self, val: i32) {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_some() {
lock = self.is_empty.wait(lock).expect("Can't wait");
}
*lock = Some(val);
self.is_full.notify_one();
}
fn remove(&self) -> i32 {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_none() {
lock = self.is_full.wait(lock).expect("Can't wait");
}
let val = lock.take().unwrap();
self.is_empty.notify_one();
val
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
982 次 |
| 最近记录: |