如何在 Rust 中实现一个长时间运行的进程并通过 Rest api 获得进展?

And*_*san 3 rust

我是 Rust 的初学者。

我有一个长时间运行的 IO 绑定进程,我想通过 REST API 生成和监视该进程。我按照本教程选择了Iron。监控意味着了解其进展和最终结果。

当我生成它时,我给它一个 id 并将该 id 映射到我可以获取以获取进度的资源。我不必精确地描述进展情况;我可以报告 5 秒前的进度。

我的第一次尝试是建立一个通道,通过该通道发送进度请求并接收状态。我被困在哪里存储接收器,因为根据我的理解,它只属于一个线程。我想将其放在请求的上下文中,但这不起作用,因为有不同的线程处理后续请求。

在 Rust 中执行此操作的惯用方法是什么?

我有一个示例项目

稍后编辑

这是一个独立的示例,它遵循示例原则作为答案,即每个线程更新其进度的映射:

extern crate iron;
extern crate router;
extern crate rustc_serialize;

use iron::prelude::*;
use iron::status;
use router::Router;
use rustc_serialize::json;
use std::io::Read;
use std::sync::{Mutex, Arc};

use std::thread;
use std::time::Duration;
use std::collections::HashMap;

#[derive(Debug, Clone, RustcEncodable, RustcDecodable)]
pub struct Status {
    pub progress: u64,
    pub context: String
}

#[derive(RustcEncodable, RustcDecodable)]
struct StartTask {
    id: u64
}

fn start_process(status: Arc<Mutex<HashMap<u64, Status>>>, task_id: u64) {
    let c = status.clone();
    thread::spawn(move || {
        for i in 1..100 {
            {
                let m = &mut c.lock().unwrap();
                m.insert(task_id, Status{ progress: i, context: "in progress".to_string()});
            }
            thread::sleep(Duration::from_secs(1));
        }
        let m = &mut c.lock().unwrap();
        m.insert(task_id, Status{ progress: 100, context: "done".to_string()});
    });
}

fn main() {
    let status: Arc<Mutex<HashMap<u64, Status>>> = Arc::new(Mutex::new(HashMap::new()));
    let status_clone: Arc<Mutex<HashMap<u64, Status>>> = status.clone();

    let mut router = Router::new();

    router.get("/:taskId", move |r: &mut Request| task_status(r, &status.lock().unwrap()));
    router.post("/start", move |r: &mut Request|
        start_task(r, status_clone.clone()));

    fn task_status(req: &mut Request, statuses: & HashMap<u64,Status>) -> IronResult<Response> {
        let ref task_id = req.extensions.get::<Router>().unwrap().find("taskId").unwrap_or("/").parse::<u64>().unwrap();
        let payload = json::encode(&statuses.get(&task_id)).unwrap();
        Ok(Response::with((status::Ok, payload)))
    }

    // Receive a message by POST and play it back.
    fn start_task(request: &mut Request, statuses: Arc<Mutex<HashMap<u64, Status>>>) -> IronResult<Response> {
        let mut payload = String::new();
        request.body.read_to_string(&mut payload).unwrap();
        let task_start_request: StartTask = json::decode(&payload).unwrap();
        start_process(statuses, task_start_request.id);
        Ok(Response::with((status::Ok, json::encode(&task_start_request).unwrap())))
    }

    Iron::new(router).http("localhost:3000").unwrap();
}
Run Code Online (Sandbox Code Playgroud)

mal*_*rbo 5

一种可能性是使用将HashMap每个工作人员 ID 与进度(和结果)相关联的全局变量。这是简单的例子(没有其余的东西)

#[macro_use]
extern crate lazy_static;

use std::sync::Mutex;
use std::collections::HashMap;
use std::thread;
use std::time::Duration;

lazy_static! {
    static ref PROGRESS: Mutex<HashMap<usize, usize>> = Mutex::new(HashMap::new());
}

fn set_progress(id: usize, progress: usize) {
    // insert replaces the old value if there was one.
    PROGRESS.lock().unwrap().insert(id, progress);
}

fn get_progress(id: usize) -> Option<usize> {
    PROGRESS.lock().unwrap().get(&id).cloned()
}

fn work(id: usize) {
    println!("Creating {}", id);
    set_progress(id, 0);
    for i in 0..100 {
        set_progress(id, i + 1);
        // simulates work
        thread::sleep(Duration::new(0, 50_000_000));
    }
}

fn monitor(id: usize) {
    loop {
        if let Some(p) = get_progress(id) {
            if p == 100 {
                println!("Done {}", id);
                // to avoid leaks, remove id from PROGRESS.
                // maybe save that the task ends in a data base.
                return
            } else {
                println!("Progress {}: {}", id, p);
            }
        }
        thread::sleep(Duration::new(1, 0));
    }
}

fn main() {
    let w = thread::spawn(|| work(1));
    let m = thread::spawn(|| monitor(1));
    w.join().unwrap();
    m.join().unwrap();
}
Run Code Online (Sandbox Code Playgroud)