l3u*_*fly 3 multithreading rust rust-tokio rust-tonic
我有一个设置,我的程序使用std::thread::spawn
.
我需要一个 GRPC 服务器来处理传入的命令以及工作线程完成的流输出。我用于tonic
GRPC 服务器,它仅在 Tokio future 内提供异步实现。
我需要能够从我的“正常”标准库线程向 Tokio 未来发送消息。
我在这里将我的代码简化为最低限度:
use std::thread;
use tokio::sync::mpsc; // 1.9.0
fn main() {
let (tx, mut rx) = mpsc::channel(1);
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
tokio_runtime.spawn(async move {
// the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
while let Some(v) = rx.recv().await {}
});
let h = thread::spawn(move || {
// do work
tx.send(1).await; //<------ error occurs here since I can't await in a non-async block
});
h.join().unwrap();
}
Run Code Online (Sandbox Code Playgroud)
我的主工作线程如何与 Tokio 生成的 GRPC 服务器通信?
您可以使用 tokio 的sync
功能。有两个选项 -UnboundedSender
和Sender::blocking_send()
。
无界发送者的问题是它没有背压,如果你的生产者比消费者更快,你的应用程序可能会因内存不足错误而崩溃,或者耗尽生产者使用的其他有限资源。
作为一般规则,您应该避免使用无界队列,这使我们有更好的选择blocking_send()
:
游乐场:
use std::thread;
use tokio::sync::mpsc; // 1.9.0
fn main() {
let (tx, mut rx) = mpsc::channel(1);
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
tokio_runtime.spawn(async move {
// the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
while let Some(v) = rx.recv().await {
println!("Received: {:?}", v);
}
});
let h = thread::spawn(move || {
// do work
tx.blocking_send(1).unwrap();
});
h.join().unwrap();
}
Run Code Online (Sandbox Code Playgroud)