如何在多线程应用程序中使用VecDeque?

hea*_*ash 3 multithreading deque rust

我正在尝试使用VecDeque. 我想将它用作对所有线程具有读写权限的共享队列。我有以下代码:

use std::collections::VecDeque;
use std::{thread, time};

fn main() {
    let mut workload = VecDeque::new();
    workload.push_back(0);

    let mut thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1 +=1;
            thread_1_queue.push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue);

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let mut thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2 +=1;
            thread_2_queue.push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue);

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.capacity() == 10 {
            break;
        }

        println!("MainQueue: {:?}", workload);

        thread::sleep(some_time);
    }

    thread_1.join();
    thread_2.join();
}
Run Code Online (Sandbox Code Playgroud)

Playground 链接 (注意它会无休止地运行)

我现在的问题是线程中的克隆不会更新主队列。现在每个线程都有自己的队列,而不是共享一个。如图所示:

use std::collections::VecDeque;
use std::{thread, time};

fn main() {
    let mut workload = VecDeque::new();
    workload.push_back(0);

    let mut thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1 +=1;
            thread_1_queue.push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue);

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let mut thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2 +=1;
            thread_2_queue.push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue);

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.capacity() == 10 {
            break;
        }

        println!("MainQueue: {:?}", workload);

        thread::sleep(some_time);
    }

    thread_1.join();
    thread_2.join();
}
Run Code Online (Sandbox Code Playgroud)

Fin*_*nis 5

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let workload = Arc::new(Mutex::new(VecDeque::new()));
    workload.lock().unwrap().push_back(0);

    let thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1 += 1;
            thread_1_queue.lock().unwrap().push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2 += 1;
            thread_2_queue.lock().unwrap().push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.lock().unwrap().capacity() == 10 {
            break;
        }

        println!("MainQueue: {:?}", workload.lock().unwrap());

        thread::sleep(some_time);
    }

    thread_1.join();
    thread_2.join();
}
Run Code Online (Sandbox Code Playgroud)
Thread #1: [0, 1]
MainQueue: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #2: [0, 1, 11, 12]
Thread #1: [0, 1, 11, 12, 2]
MainQueue: [0, 1, 11, 12, 2]
Thread #2: [0, 1, 11, 12, 2, 13]
Thread #1: [0, 1, 11, 12, 2, 13, 3]
MainQueue: [0, 1, 11, 12, 2, 13, 3]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4]
MainQueue: [0, 1, 11, 12, 2, 13, 3, 14, 4]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15, 5]
...
Run Code Online (Sandbox Code Playgroud)

解释

Arc创建一个多线程引用计数器,您可以使用它向多个线程共享单个对象。请注意, 的内容始终Arc是不可变的,因为 Rust 中永远不允许对同一对象进行多个可变引用。

这就是为什么你需要一个Mutex内部的。它创造了所谓的内部可变性。这意味着,您可以使用它临时获得对对象的可变访问,同时确保可变访问不会与其他线程发生冲突。

此外,这意味着当另一个线程在lock()已锁定的情况下调用时,它将阻塞另一个线程。这就是所谓的瓶颈,它将限制您从多线程中获得的加速量。

此外,请注意,在两个lock()s 之间,队列的内容可能会发生变化。因此,如果队列中原子发生的事情很重要,则需要在该操作的整个持续时间内保持队列锁定,这会进一步降低加速速度。

更多错误

  • 我认为你混淆了.capacity().len()
  • Result您应该使用of做一些事情.join(),我在这里将简单地做一些事情.unwrap()
  • .len() == 10在多线程场景中不起作用,因为它可以直接从 跳转911。所以对于多线程场景,最好这样做>= 10,这总是有效的。

修复了无法永远运行的代码:

Thread #1: [0, 1]
MainQueue: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #2: [0, 1, 11, 12]
Thread #1: [0, 1, 11, 12, 2]
MainQueue: [0, 1, 11, 12, 2]
Thread #2: [0, 1, 11, 12, 2, 13]
Thread #1: [0, 1, 11, 12, 2, 13, 3]
MainQueue: [0, 1, 11, 12, 2, 13, 3]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4]
MainQueue: [0, 1, 11, 12, 2, 13, 3, 14, 4]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15, 5]
...
Run Code Online (Sandbox Code Playgroud)
Thread #1: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #1: [0, 1, 11, 2]
Thread #2: [0, 1, 11, 2, 12]
MainQueue: [0, 1, 11, 2, 12]
Thread #1: [0, 1, 11, 2, 12, 3]
MainQueue: [0, 1, 11, 2, 12, 3]
Thread #2: [0, 1, 11, 2, 12, 3, 13]
MainQueue: [0, 1, 11, 2, 12, 3, 13]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4]
MainQueue: [0, 1, 11, 2, 12, 3, 13, 14, 4]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20, 10]
Run Code Online (Sandbox Code Playgroud)