使用 Mutex 和 Condvar 在 Rust 中缓冲

Adr*_*lle 1 concurrency multithreading mutex condition-variable rust

我正在尝试使用单个消费者和单个生产者实现缓冲区。我只使用了 POSIX 信号量,但是,它们在 Rust 中不可用,我正在尝试使用 Rust 同步原语 ( Mutex, Condvar, Barrier, ...)实现一个微不足道的信号量问题,但我不想使用通道。

我的代码表现得太不规则了,在某些情况下运行良好,有时它只是在某个数字处停止,而在其他情况下它只是不开始计数。

如果我在主线程中等待 1 秒直到我发送Condvar通知,事情似乎会更好,但它并不能保证它不会进入死锁。

如何修复这个程序?我理解Condvar错了吗?

use std::thread;
use std::sync::{Arc, Condvar, Mutex};

struct Buffer {
    is_data: Mutex<bool>,
    is_data_cv: Condvar,
    is_space: Mutex<bool>,
    is_space_cv: Condvar,
    buffer: Mutex<i32>,
}

fn producer(buffer: Arc<Buffer>) {
    for i in 0..50 {
        loop {
            let mut is_space = buffer
                .is_space_cv
                .wait(buffer.is_space.lock().unwrap())
                .unwrap();
            if *is_space {
                {
                    let mut hueco = buffer.buffer.lock().unwrap();
                    *hueco = i;
                }

                *is_space = false;
                {
                    let mut is_data = buffer.is_data.lock().unwrap();
                    *is_data = true;
                }
                buffer.is_data_cv.notify_one();
                break;
            }
        }
    }
}

fn consumer(buffer: Arc<Buffer>) {
    for i in 0..50 {
        loop {
            let mut is_data = buffer
                .is_data_cv
                .wait(buffer.is_data.lock().unwrap())
                .unwrap();
            if *is_data {
                {
                    let hueco = buffer.buffer.lock().unwrap();
                    println!("{}", *hueco);
                }
                *is_data = false;
                {
                    let mut is_space = buffer.is_space.lock().unwrap();
                    *is_space = true;
                }
                buffer.is_space_cv.notify_one();
                break;
            }
        }
    }
}

fn main() {
    let buffer = Arc::new(Buffer {
        is_data: Mutex::new(false),
        is_data_cv: Condvar::new(),
        is_space: Mutex::new(true),
        is_space_cv: Condvar::new(),
        buffer: Mutex::new(0),
    });
    let b = buffer.clone();
    let p = thread::spawn(move || {
        producer(b);
    });
    let b = buffer.clone();
    let c = thread::spawn(move || {
        consumer(b);
    });

    //thread::sleep_ms(1000);

    buffer.is_space_cv.notify_one();
    c.join();
}
Run Code Online (Sandbox Code Playgroud)

She*_*ter 6

我鼓励您创建更小的方法并重用现有的 Rust 类型,例如Option. 这将允许您大大简化您的代码 - 只有一个Mutex和一个Condvar

use std::thread;
use std::sync::{Arc, Condvar, Mutex};

#[derive(Debug, Default)]
struct Buffer {
    data: Mutex<Option<i32>>,
    data_cv: Condvar,
}

impl Buffer {
    fn insert(&self, val: i32) {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_some() {
            lock = self.data_cv.wait(lock).expect("Can't wait");
        }
        *lock = Some(val);
        self.data_cv.notify_one();
    }

    fn remove(&self) -> i32 {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_none() {
            lock = self.data_cv.wait(lock).expect("Can't wait");
        }
        let val = lock.take().unwrap();
        self.data_cv.notify_one();
        val
    }
}

fn producer(buffer: &Buffer) {
    for i in 0..50 {
        println!("p: {}", i);
        buffer.insert(i);
    }
}

fn consumer(buffer: &Buffer) {
    for _ in 0..50 {
        let val = buffer.remove();
        println!("c: {}", val);
    }
}

fn main() {
    let buffer = Arc::new(Buffer::default());

    let b = buffer.clone();
    let p = thread::spawn(move || {
        producer(&b);
    });

    let b = buffer.clone();
    let c = thread::spawn(move || {
        consumer(&b);
    });

    c.join().expect("Consumer had an error");
    p.join().expect("Producer had an error");
}
Run Code Online (Sandbox Code Playgroud)

如果您想获得更高的性能(基准测试是否值得),您可以分别Condvar为“空”和“满”条件设置s:

#[derive(Debug, Default)]
struct Buffer {
    data: Mutex<Option<i32>>,
    is_empty: Condvar,
    is_full: Condvar,
}

impl Buffer {
    fn insert(&self, val: i32) {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_some() {
            lock = self.is_empty.wait(lock).expect("Can't wait");
        }
        *lock = Some(val);
        self.is_full.notify_one();
    }

    fn remove(&self) -> i32 {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_none() {
            lock = self.is_full.wait(lock).expect("Can't wait");
        }
        let val = lock.take().unwrap();
        self.is_empty.notify_one();
        val
    }
}
Run Code Online (Sandbox Code Playgroud)