hea*_*ash 3 multithreading deque rust
我正在尝试使用VecDeque
. 我想将它用作对所有线程具有读写权限的共享队列。我有以下代码:
use std::collections::VecDeque;
use std::{thread, time};
fn main() {
let mut workload = VecDeque::new();
workload.push_back(0);
let mut thread_1_queue = workload.clone();
let thread_1 = thread::spawn(move || {
let mut counter1: i32 = 0;
let some_time = time::Duration::from_millis(50);
loop {
counter1 +=1;
thread_1_queue.push_back(counter1);
println!("Thread #1: {:?}", thread_1_queue);
if counter1 == 10 {
break;
}
thread::sleep(some_time);
};
});
let mut thread_2_queue = workload.clone();
let thread_2 = thread::spawn(move || {
let mut counter2: i32 = 10;
let some_time = time::Duration::from_millis(50);
loop {
counter2 +=1;
thread_2_queue.push_back(counter2);
println!("Thread #2: {:?}", thread_2_queue);
if counter2 == 20 {
break;
}
thread::sleep(some_time);
};
});
let some_time = time::Duration::from_millis(50);
loop {
if workload.capacity() == 10 {
break;
}
println!("MainQueue: {:?}", workload);
thread::sleep(some_time);
}
thread_1.join();
thread_2.join();
}
Run Code Online (Sandbox Code Playgroud)
Playground 链接 (注意它会无休止地运行)
我现在的问题是线程中的克隆不会更新主队列。现在每个线程都有自己的队列,而不是共享一个。如图所示:
use std::collections::VecDeque;
use std::{thread, time};
fn main() {
let mut workload = VecDeque::new();
workload.push_back(0);
let mut thread_1_queue = workload.clone();
let thread_1 = thread::spawn(move || {
let mut counter1: i32 = 0;
let some_time = time::Duration::from_millis(50);
loop {
counter1 +=1;
thread_1_queue.push_back(counter1);
println!("Thread #1: {:?}", thread_1_queue);
if counter1 == 10 {
break;
}
thread::sleep(some_time);
};
});
let mut thread_2_queue = workload.clone();
let thread_2 = thread::spawn(move || {
let mut counter2: i32 = 10;
let some_time = time::Duration::from_millis(50);
loop {
counter2 +=1;
thread_2_queue.push_back(counter2);
println!("Thread #2: {:?}", thread_2_queue);
if counter2 == 20 {
break;
}
thread::sleep(some_time);
};
});
let some_time = time::Duration::from_millis(50);
loop {
if workload.capacity() == 10 {
break;
}
println!("MainQueue: {:?}", workload);
thread::sleep(some_time);
}
thread_1.join();
thread_2.join();
}
Run Code Online (Sandbox Code Playgroud)
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::{thread, time};
fn main() {
let workload = Arc::new(Mutex::new(VecDeque::new()));
workload.lock().unwrap().push_back(0);
let thread_1_queue = workload.clone();
let thread_1 = thread::spawn(move || {
let mut counter1: i32 = 0;
let some_time = time::Duration::from_millis(50);
loop {
counter1 += 1;
thread_1_queue.lock().unwrap().push_back(counter1);
println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());
if counter1 == 10 {
break;
}
thread::sleep(some_time);
}
});
let thread_2_queue = workload.clone();
let thread_2 = thread::spawn(move || {
let mut counter2: i32 = 10;
let some_time = time::Duration::from_millis(50);
loop {
counter2 += 1;
thread_2_queue.lock().unwrap().push_back(counter2);
println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());
if counter2 == 20 {
break;
}
thread::sleep(some_time);
}
});
let some_time = time::Duration::from_millis(50);
loop {
if workload.lock().unwrap().capacity() == 10 {
break;
}
println!("MainQueue: {:?}", workload.lock().unwrap());
thread::sleep(some_time);
}
thread_1.join();
thread_2.join();
}
Run Code Online (Sandbox Code Playgroud)
Thread #1: [0, 1]
MainQueue: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #2: [0, 1, 11, 12]
Thread #1: [0, 1, 11, 12, 2]
MainQueue: [0, 1, 11, 12, 2]
Thread #2: [0, 1, 11, 12, 2, 13]
Thread #1: [0, 1, 11, 12, 2, 13, 3]
MainQueue: [0, 1, 11, 12, 2, 13, 3]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4]
MainQueue: [0, 1, 11, 12, 2, 13, 3, 14, 4]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15, 5]
...
Run Code Online (Sandbox Code Playgroud)
Arc
创建一个多线程引用计数器,您可以使用它向多个线程共享单个对象。请注意, 的内容始终Arc
是不可变的,因为 Rust 中永远不允许对同一对象进行多个可变引用。
这就是为什么你需要一个Mutex
内部的。它创造了所谓的内部可变性。这意味着,您可以使用它临时获得对对象的可变访问,同时确保可变访问不会与其他线程发生冲突。
此外,这意味着当另一个线程在lock()
已锁定的情况下调用时,它将阻塞另一个线程。这就是所谓的瓶颈,它将限制您从多线程中获得的加速量。
此外,请注意,在两个lock()
s 之间,队列的内容可能会发生变化。因此,如果队列中原子发生的事情很重要,则需要在该操作的整个持续时间内保持队列锁定,这会进一步降低加速速度。
.capacity()
和.len()
。Result
您应该使用of做一些事情.join()
,我在这里将简单地做一些事情.unwrap()
。.len() == 10
在多线程场景中不起作用,因为它可以直接从 跳转9
到11
。所以对于多线程场景,最好这样做>= 10
,这总是有效的。修复了无法永远运行的代码:
Thread #1: [0, 1]
MainQueue: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #2: [0, 1, 11, 12]
Thread #1: [0, 1, 11, 12, 2]
MainQueue: [0, 1, 11, 12, 2]
Thread #2: [0, 1, 11, 12, 2, 13]
Thread #1: [0, 1, 11, 12, 2, 13, 3]
MainQueue: [0, 1, 11, 12, 2, 13, 3]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4]
MainQueue: [0, 1, 11, 12, 2, 13, 3, 14, 4]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15, 5]
...
Run Code Online (Sandbox Code Playgroud)
Thread #1: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #1: [0, 1, 11, 2]
Thread #2: [0, 1, 11, 2, 12]
MainQueue: [0, 1, 11, 2, 12]
Thread #1: [0, 1, 11, 2, 12, 3]
MainQueue: [0, 1, 11, 2, 12, 3]
Thread #2: [0, 1, 11, 2, 12, 3, 13]
MainQueue: [0, 1, 11, 2, 12, 3, 13]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4]
MainQueue: [0, 1, 11, 2, 12, 3, 13, 14, 4]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20, 10]
Run Code Online (Sandbox Code Playgroud)