为什么Rust互斥锁似乎没有将锁锁定到想要最后锁定它的线程?

Bla*_*tus 8 multithreading mutex shared-memory rust

我想编写一个程序,该程序产生两个线程,这些线程锁定a Mutex,增加它,打印内容然后解锁,Mutex以便另一个线程可以执行相同的操作。我添加了一些睡眠时间以使其更加一致,因此我认为输出应为:

ping pong ping pong …  
Run Code Online (Sandbox Code Playgroud)

但实际输出是相当随机的。大多数时候,它只是

ping ping ping … pong 
Run Code Online (Sandbox Code Playgroud)

但是根本没有一致性。有时中间也有一个“乒乓球”。

我相信互斥锁可以通过某种方式来确定谁想最后锁定它,但事实并非如此。

  1. 锁定实际上如何工作?
  2. 如何获得所需的输出?
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();
    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || loop {
        let mut data = data1.lock().unwrap();
        thread::sleep(ten_millis);
        println!("ping ");
        *data += 1;
        if *data > 10 {
            break;
        }
    });

    let b = thread::spawn(move || loop {
        let mut data = data2.lock().unwrap();
        thread::sleep(ten_millis);
        println!("pong ");
        *data += 1;
        if *data > 10 {
            break;
        }
    });

    a.join().unwrap();
    b.join().unwrap();
}
Run Code Online (Sandbox Code Playgroud)

Pet*_*all 9

Mutex并且RwLock两者都遵循特定于OS的原语,并且不能保证公平。在Windows上,它们都使用SRW锁实现,这些记录专门记录为公平。我没有为其他操作系统做过研究,但是您绝对不能依赖std::sync::Mutex,特别是如果您需要此代码可移植的话。

鲁斯特一种可能的解决方案是 Mutex由提供的实现parking_lot板条箱,其提供了一种unlock_fair方法,它与一个公平算法实现。

parking_lot文档中

默认情况下,互斥锁是不公平的,并且即使该线程在互斥锁上已被长时间阻止,互斥锁仍允许当前线程在另一个线程获得该锁之前重新锁定该互斥锁。这是默认设置,因为它允许更高的吞吐量,因为它避免了在每个互斥锁解锁上强制进行上下文切换。这可能导致一个线程获取互斥锁的次数比其他线程多得多。

但是,在某些情况下,通过强制将锁传递给等待的线程(如果有的话)来确保公平性可能是有益的。这是通过使用此方法而不是MutexGuard正常删除来完成的。

尽管parking_lot::Mutex没有专门使用该unlock_fair方法并不能声称是公平的,但我发现您的代码仅通过做出该开关(运动场)就产生了与pong相同数量的ping ,甚至没有使用该unlock_fair方法。

通常,当防护超出范围时,互斥锁会自动解锁。为了使其公平解锁,您需要在删除防护之前插入此方法调用:

let b = thread::spawn(move || loop {
    let mut data = data1.lock();
    thread::sleep(ten_millis);
    println!("pong ");
    *data += 1;
    if *data > 10 {
        break;
    }
    MutexGuard::unlock_fair(data);
});
Run Code Online (Sandbox Code Playgroud)


Sve*_*rev 5

互斥锁的锁定顺序没有任何保证;第一个线程有可能 100% 的时间获取锁,而第二个线程则为 0%

线程由操作系统调度,以下场景很有可能:

  1. 操作系统为第一个线程提供 CPU 时间并获取锁
  2. 操作系统为第二个线程提供 CPU 时间,但锁被占用,因此它进入睡眠状态
  3. 第一个线程释放了锁,但仍被操作系统允许运行。它进行循环的另一次迭代并重新获取锁
  4. 另一个线程无法继续,因为锁仍然被占用。

如果你给第二个线程更多的时间来获取锁,你会看到预期的乒乓模式,尽管不能保证(坏的操作系统可能决定永远不给你的某些线程分配 CPU 时间):

use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();

    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || loop {
        let mut data = data1.lock().unwrap();
        *data += 1;
        if *data > 10 {
            break;
        }

        drop(data);
        thread::sleep(ten_millis);
        println!("ping ");
    });

    let b = thread::spawn(move || loop {
        let mut data = data2.lock().unwrap();
        *data += 1;
        if *data > 10 {
            break;
        }

        drop(data);
        thread::sleep(ten_millis);
        println!("pong ");
    });

    a.join().unwrap();
    b.join().unwrap();
}
Run Code Online (Sandbox Code Playgroud)

您可以通过玩睡眠时间来验证这一点。睡眠时间越短,乒乓交替越不规则,低至10ms时,您可能会看到乒乓等。

本质上,基于时间的解决方案在设计上是糟糕的。您可以通过改进算法来保证“ping”后面跟着“pong”。例如,您可以在奇数上打印“ping”,在偶数上打印“pong”:

use std::sync::{Arc, Mutex};
use std::{thread, time};

const MAX_ITER: i32 = 10;

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();

    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || 'outer: loop {
        loop {
            thread::sleep(ten_millis);
            let mut data = data1.lock().unwrap();
            if *data > MAX_ITER {
                break 'outer;
            }

            if *data & 1 == 1 {
                *data += 1;
                println!("ping ");
                break;
            }
        }
    });

    let b = thread::spawn(move || 'outer: loop {
        loop {
            thread::sleep(ten_millis);
            let mut data = data2.lock().unwrap();
            if *data > MAX_ITER {
                break 'outer;
            }

            if *data & 1 == 0 {
                *data += 1;
                println!("pong ");
                break;
            }
        }
    });

    a.join().unwrap();
    b.join().unwrap();
}
Run Code Online (Sandbox Code Playgroud)

这不是最好的实现,但我尝试对原始代码进行尽可能少的修改。

您也可以考虑使用 a 实现Condvar,在我看来,这是一个更好的解决方案,因为它避免了对互斥锁的忙碌等待(ps:还删除了代码重复):

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

const MAX_ITER: i32 = 10;

fn main() {
    let cv1 = Arc::new((Condvar::new(), Mutex::new(1)));
    let cv2 = cv1.clone();

    let a = thread::spawn(ping_pong_task("ping", cv1, |x| x & 1 == 1));
    let b = thread::spawn(ping_pong_task("pong", cv2, |x| x & 1 == 0));

    a.join().unwrap();
    b.join().unwrap();
}

fn ping_pong_task<S: Into<String>>(
        msg: S, 
        cv: Arc<(Condvar, Mutex<i32>)>, 
        check: impl Fn(i32) -> bool) -> impl Fn() 
{
    let message = msg.into();

    move || {
        let (condvar, mutex) = &*cv;

        let mut value = mutex.lock().unwrap();
        loop {
            if check(*value) {
                println!("{} ", message);
                *value += 1;
                condvar.notify_all();
            }

            if *value > MAX_ITER {
                break;
            }

            value = condvar.wait(value).unwrap();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)