Rust 超级流媒体功能

Adr*_*agy 5 streaming rust hyper

我目前正在尝试通过 rust with hyper (0.13) 中的简单 HTTP get 请求来实现数据流。这个想法很简单。如果客户端发送请求,服务器每 5 秒以“块”形式响应,使连接始终保持打开状态。

我正在尝试从 futures 箱中实现特征 Stream

impl Stream for MonitorString {
    type Item = Result<String, serde_json::Error>;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Result<String, serde_json::Error>>> {
        thread::sleep(Duration::from_secs(5)); 

        Poll::Ready(Some(self.data.to_string()))
    }
}
Run Code Online (Sandbox Code Playgroud)

然后建立一个响应,例如

type StreamingServiceResult = Result<Response<Body>, Box<dyn std::error::Error + Sync + Send>>;

pub async fn handle_request(_: Request<Body>, params: Params, _: Query) -> StreamingServiceResult {
     Ok(Response::builder()
        .header(hyper::header::CONTENT_TYPE, "application/json")
        .header(hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
        .header(hyper::header::TRANSFER_ENCODING, "chunked")
        .body(Body::wrap_stream(MonitorString::new(...)))
}
Run Code Online (Sandbox Code Playgroud)

当我向服务器发送请求时,它会打开连接并挂起。通过一些调试,我看到调用了 poll_next 函数,但仅当流没有更多内容可产生时才发送响应(poll::Ready(None) 由 poll_next 返回)并整体发送。

我可以看到我对 Stream 特征和 Body::wrap_stream 的最初理解是错误的。遗憾的是,我找不到任何适合我的用例的示例。你能让我走上正确的道路吗?