如何启动和停止工作线程

Trầ*_* Dự 1 multithreading rust

我有以下要求,这是其他编程语言的标准要求,但我不知道如何在 Rust 中做到这一点。

我有一个类,我想编写一个方法来生成满足两个条件的工作线程:

  • 产生工作线程后,函数返回(所以其他地方不需要等待)
  • 有一种机制可以停止该线程。

例如,这是我的虚拟代码:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}
Run Code Online (Sandbox Code Playgroud)

我在 处出现错误thread: JoinHandle<?>。在这种情况下,线程的类型是什么。我的代码是否正确启动和停止工作线程?

Val*_*tin 10

简而言之, a上的Tin返回传递给 的闭包结果。因此,在您的情况下,您的闭包不会返回任何内容,即(unit)join()JoinHandle<T>thread::spawn()JoinHandle<?>JoinHandle<()>()

除此之外,您的虚拟代码还包含一些其他问题。

  • 的返回类型run()不正确,并且至少需要是Result<(), ()>
  • thread字段需要Option<JoinHandle<()>能够处理 fn stop(&mut self)消耗join()JoinHandle.
  • 但是,您尝试传递&mut self到闭包,这会带来更多问题,归结为多个可变引用
    • 这可以用例如来解决Mutex<A>。但是,如果您stop()随后调用,则可能会导致僵局。

但是,由于它是虚拟代码,并且您在评论中进行了澄清。让我尝试用几个例子来澄清你的意思。这包括我重写你的虚拟代码。

工人完成后的结果

如果您在工作线程运行时不需要访问数据,那么您可以创建一个新的struct WorkerData. 然后,run()您复制/克隆您需要的数据A(或者正如我将其重命名的那样Worker)。然后在闭包中你最终data再次返回,因此你可以通过 获取它join()

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

您实际上并不需thread要这样做Option<JoinHandle<WorkerData>>,而是可以使用JoinHandle<WorkerData>>. 因为如果您想再次调用run(),重新分配保存Worker.

所以现在我们可以简化Worker,删除Option并改为stop消费thread,并用创建new() -> Self代替run(&mut self)

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}
Run Code Online (Sandbox Code Playgroud)

共享WorkerData

如果您想保留对WorkerData多个线程之间的引用,那么您需要使用Arc. 由于您还希望能够改变它,因此您需要使用Mutex.

如果您只在单个线程中进行变异,那么您也可以使用 a RwLock,与 a 相比,Mutex它允许您同时锁定并获取多个不可变引用。

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}
Run Code Online (Sandbox Code Playgroud)

如果添加一个方法就能够以data的形式获取Arc<RwLock<WorkerData>>。然后,如果您在调用之前克隆Arc并锁定它(内部) ,那么这将导致死锁。为了避免这种情况,任何方法都应该返回or而不是. 这样你就无法调用并导致僵局。RwLockstop()data()&WorkerData&mut WorkerDataArcstop()

停止工人的标​​志

如果您确实想停止工作线程,那么您必须使用一个标志来指示它这样做。您可以以共享的形式创建一个标志AtomicBool

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}
Run Code Online (Sandbox Code Playgroud)

多线程和多任务

如果您希望处理多种类型的任务,分布在多个线程中,那么这里有一个更通用的示例。

您已经提到使用mpsc. 因此,您可以将SenderandReceiver与自定义TaskTaskResult枚举一起使用。

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}
Run Code Online (Sandbox Code Playgroud)

使用发送任务和接收任务结果的示例Worker如下所示:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}
Run Code Online (Sandbox Code Playgroud)

在某些情况下,您可能需要实现SendforTask和/或TaskResult查看“了解发送特征”

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Run Code Online (Sandbox Code Playgroud)

  • 这花了相当多的时间,但我希望其中一些示例能为您和其他人提供帮助:) (2认同)