如何从产生数据块的慢速处理侧线程流式传输超级请求的正文?

nja*_*ard 5 rust hyper rust-tokio

我有一个程序可以缓慢地生成数据(我们可以说它是计算密集型的,就像计算 pi 的数字一样)。它产生大量数据;每个响应可以是 1GiB,不适合内存,并且必须按需生成。我正在使用 hyper 编写一个 Web 服务来根据请求生成内容。

让我们跳过样板(service_fn, Server::bind)。

缓慢生成数据的 API 可能类似于

use std::io;

impl SlowData {
    fn new(initial: &str) -> SlowData {
        unimplemented!()
    }

    fn next_block(&self) -> io::Result<&[u8]> {
        unimplemented!()
    }
}

type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;

fn run(req: Request) -> ResponseFuture {
    // spawn a thread and:
    // initialize the generator
    // SlowData::new(&req.uri().path());

    // spawn a thread and call slow.next_block() until len()==0
    // each byte which comes from next_block should go to the client
    // as part of the Body
}
Run Code Online (Sandbox Code Playgroud)

请注意,这SlowData::new也是计算密集型的。

最理想的情况是,我们可以最小化副本并将其&[u8]直接发送到 hyper,而无需将其复制到某个Vec文件或其他文件中。

如何从侧线程完成超级请求的正文?

She*_*ter 6

启动线程池中的线程并通过通道发送数据块。通道实现Stream和超级可以使用Body以下方式构建:Streamwrap_stream

use futures::{channel::mpsc, executor::ThreadPool, task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"]
use hyper::{
    service::{make_service_fn, service_fn},
    Body, Response, Server,
}; // 0.13.1
use std::{convert::Infallible, io, thread, time::Duration};
use tokio; // 0.2.6, features = ["macros"]

struct SlowData;
impl SlowData {
    fn new(_initial: &str) -> SlowData {
        thread::sleep(Duration::from_secs(1));
        Self
    }

    fn next_block(&self) -> io::Result<&[u8]> {
        thread::sleep(Duration::from_secs(1));
        Ok(b"data")
    }
}

fn stream(pool: ThreadPool) -> impl Stream<Item = io::Result<Vec<u8>>> {
    let (mut tx, rx) = mpsc::channel(10);

    pool.spawn(async move {
        let sd = SlowData::new("dummy");

        for _ in 0..3 {
            let block = sd.next_block().map(|b| b.to_vec());
            tx.send(block).await.expect("Unable to send block");
        }
    })
    .expect("Unable to spawn thread");

    rx
}

#[tokio::main]
async fn main() {
    // Construct our SocketAddr to listen on...
    let addr = ([127, 0, 0, 1], 3000).into();

    // Create a threadpool (cloning is cheap)...
    let pool = ThreadPool::new().unwrap();

    // Handle each connection...
    let make_service = make_service_fn(|_socket| {
        let pool = pool.clone();

        async {
            // Handle each request...
            let svc_fn = service_fn(move |_request| {
                let pool = pool.clone();

                async {
                    let data = stream(pool);
                    let resp = Response::new(Body::wrap_stream(data));

                    Result::<_, Infallible>::Ok(resp)
                }
            });

            Result::<_, Infallible>::Ok(svc_fn)
        }
    });

    // Bind and serve...
    let server = Server::bind(&addr).serve(make_service);

    // Finally, run the server
    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}
Run Code Online (Sandbox Code Playgroud)

创建线程时,无法避免将切片复制到Vec.

也可以看看: