Trầ*_* Dự 1 multithreading rust
我有以下要求,这是其他编程语言的标准要求,但我不知道如何在 Rust 中做到这一点。
我有一个类,我想编写一个方法来生成满足两个条件的工作线程:
例如,这是我的虚拟代码:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
Run Code Online (Sandbox Code Playgroud)
我在 处出现错误thread: JoinHandle<?>
。在这种情况下,线程的类型是什么。我的代码是否正确启动和停止工作线程?
Val*_*tin 10
简而言之, a上的T
in返回传递给 的闭包结果。因此,在您的情况下,您的闭包不会返回任何内容,即(unit)。join()
JoinHandle<T>
thread::spawn()
JoinHandle<?>
JoinHandle<()>
()
除此之外,您的虚拟代码还包含一些其他问题。
run()
不正确,并且至少需要是Result<(), ()>
。thread
字段需要Option<JoinHandle<()>
能够处理 fn stop(&mut self)
消耗join()
的JoinHandle
.&mut self
到闭包,这会带来更多问题,归结为多个可变引用
Mutex<A>
。但是,如果您stop()
随后调用,则可能会导致僵局。但是,由于它是虚拟代码,并且您在评论中进行了澄清。让我尝试用几个例子来澄清你的意思。这包括我重写你的虚拟代码。
如果您在工作线程运行时不需要访问数据,那么您可以创建一个新的struct WorkerData
. 然后,run()
您复制/克隆您需要的数据A
(或者正如我将其重命名的那样Worker
)。然后在闭包中你最终data
再次返回,因此你可以通过 获取它join()
。
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Run Code Online (Sandbox Code Playgroud)
您实际上并不需thread
要这样做Option<JoinHandle<WorkerData>>
,而是可以使用JoinHandle<WorkerData>>
. 因为如果您想再次调用run()
,重新分配保存Worker
.
所以现在我们可以简化Worker
,删除Option
并改为stop
消费thread
,并用创建new() -> Self
代替run(&mut self)
。
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Run Code Online (Sandbox Code Playgroud)
WorkerData
如果您想保留对WorkerData
多个线程之间的引用,那么您需要使用Arc
. 由于您还希望能够改变它,因此您需要使用Mutex
.
如果您只在单个线程中进行变异,那么您也可以使用 a RwLock
,与 a 相比,Mutex
它允许您同时锁定并获取多个不可变引用。
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Run Code Online (Sandbox Code Playgroud)
如果添加一个方法就能够以data
的形式获取Arc<RwLock<WorkerData>>
。然后,如果您在调用之前克隆Arc
并锁定它(内部) ,那么这将导致死锁。为了避免这种情况,任何方法都应该返回or而不是. 这样你就无法调用并导致僵局。RwLock
stop()
data()
&WorkerData
&mut WorkerData
Arc
stop()
如果您确实想停止工作线程,那么您必须使用一个标志来指示它这样做。您可以以共享的形式创建一个标志AtomicBool
。
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Run Code Online (Sandbox Code Playgroud)
如果您希望处理多种类型的任务,分布在多个线程中,那么这里有一个更通用的示例。
您已经提到使用mpsc
. 因此,您可以将Sender
andReceiver
与自定义Task
和TaskResult
枚举一起使用。
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Run Code Online (Sandbox Code Playgroud)
使用发送任务和接收任务结果的示例Worker
如下所示:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
Run Code Online (Sandbox Code Playgroud)
在某些情况下,您可能需要实现Send
forTask
和/或TaskResult
。查看“了解发送特征”。
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1689 次 |
最近记录: |