Fra*_*zzi 4 ffi rust async-await
我在想我的想法是否可以实现:一个完全支持异步 Rust 的动态库插件管理器。
实现分为三个主要部分(github repo):
my-interface,一个async_trait(具有“正常”特征的方法似乎工作正常,不知道到什么程度)
use async_trait::async_trait;
#[async_trait]
pub trait SayHelloService {
async fn say_hello(&self);
}
Run Code Online (Sandbox Code Playgroud)
my-master,加载.dll/.so并从库中调用一个创建函数,给定 a tokio::runtime::Handle,返回一个Box<dyn SayHelloService>
use my_interface::SayHelloService;
use tokio::{self, runtime::Handle};
#[tokio::main]
async fn main() {
let lib = libloading::Library::new("target/debug/libmy_plugin.so").expect("load library");
let new_service: libloading::Symbol<fn(Handle) -> Box<dyn SayHelloService>> =
unsafe { lib.get(b"new_service") }.expect("load symbol");
let service1 = new_service(Handle::current());
let service2 = new_service(Handle::current());
let _ = tokio::join!(service1.say_hello(), service2.say_hello());
}
Run Code Online (Sandbox Code Playgroud)
my-plugin,实现SayHelloService,这是阻止我的代码
use async_trait::async_trait;
use my_interface::SayHelloService;
use tokio::{self, runtime::Handle};
#[no_mangle]
pub fn new_service(handle: Handle) -> Box<dyn SayHelloService> {
Box::new(PluginSayHello::new(handle))
}
pub struct PluginSayHello {
id: String,
handle: Handle,
}
impl PluginSayHello {
fn new(handle: Handle) -> PluginSayHello {
let id = format!("{:08x}", rand::random::<u32>());
println!("[{}] Created instance!", id);
PluginSayHello { id, handle }
}
}
#[async_trait]
impl SayHelloService for PluginSayHello {
// this errors with "future cannot be sent between threads safely"
async fn say_hello(&self) {
// this should enable you to call tokio::sleep but EnterGuard is not Send :(
// https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.enter
let _guard = self.handle.enter();
println!("[{}] Hello from plugin!", self.id);
let _ = tokio::spawn(async move {
let _ = tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("sleep 1");
})
.await;
}
}
Run Code Online (Sandbox Code Playgroud)
无法调用self.handle.enter()会产生各种奇怪的行为,例如:(
如果您想要更多信息,我将附上崩溃日志)
#[async_trait]
impl SayHelloService for PluginSayHello {
async fn say_hello(&self) {
let id = self.id.clone();
let _ = self
.handle
.spawn_blocking(move || {
println!("[{}] Hello from plugin!", id);
// internal code of reqwest just crashes
let body = reqwest::get("https://www.rust-lang.org")
.await?
.text()
.await?;
println!("body = {:?}", body);
})
.await;
}
}
Run Code Online (Sandbox Code Playgroud)
我还实现了 的工作实现PluginSayHello,但对我来说这并不是完全的胜利。
#[async_trait]
impl SayHelloService for PluginSayHello {
async fn say_hello(&self) {
let id = self.id.clone();
let _ = self
.handle
.spawn(async move {
println!("[{}] Hello from plugin!", id);
// calling tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// errors with "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
let _ = sleep(Duration::new(1, 0));
println!("slept 1");
println!("[{}] Hello again from plugin!", id);
})
.await;
}
}
Run Code Online (Sandbox Code Playgroud)
总结一下:
这是一个看似不平凡的场景。
首先让我们退后一步,解释一下为什么事情是这样的。I/O 和计时器的异步原语在一起管理时效果最佳,因为需要一些线程处理操作系统事件。TokioRuntime当然提供了这一点,但为了避免手动传递句柄以向运行时注册计时器和 I/O 调用,有一个线程本地“当前”变量,用于存储正在执行当前任务的运行时的句柄。因此,当您执行 Tokio 特定的操作时,例如tokio::time::sleep它将使用该线程局部变量来访问运行时的时间管理器。如果没有“当前”,您会收到“没有反应堆正在运行”错误。
这成为您的插件系统中的一个问题,因为tokio将在每个编译的二进制文件(主文件和插件)中静态链接,这意味着它们将具有不同的线程局部变量:即插件中的“当前”内容与插件中的“当前”内容不同对于主服务器来说是“当前”的,即使使用相同的线程也是如此。因此,看来您已尝试手动传递句柄并使用,.enter()但此解决方案不太正确。
您遇到的一个问题是,这会导致您的异步函数出现!Send,如果您不需要它们,则可以通过Send使用来解决#[async_trait(?Send)],但我假设您需要。另一个问题是,如果任务可以在线程之间移动,则仅设置当前线程的handle.enter()当前运行时间,并且您的任务可能在某个时刻移动到不同的线程。
您需要的是让您的任务仅在执行时才async具有“当前”运行时。您可以使用包装器来做到这一点:Future
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
use tokio::runtime::Handle;
use pin_project_lite::pin_project;
pin_project! {
struct WithRuntime<F> {
runtime: Handle,
#[pin]
fut: F,
}
}
impl<F> WithRuntime<F> {
fn new(runtime: Handle, fut: F) -> Self {
Self { runtime, fut }
}
}
impl<F> Future for WithRuntime<F>
where
F: Future
{
type Output = F::Output;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.runtime.enter();
this.fut.poll(ctx)
}
}
Run Code Online (Sandbox Code Playgroud)
然后你可以使用它WithRuntime::new(handle, async { ... }).await(完整代码在这里)。将它与你当前的特质相结合的确切方法可能会很尴尬,但我认为无论如何它都需要改变。
当前设置的另一个问题是您假设一个运行时将始终调用您的服务。这可能不是真的;有像 Actix-Web 中那样的配置,其中运行多个单线程运行时。我认为您的代码仍然可以工作,但可能不是最佳的。因此,您可能应该在每个异步方法调用上传递“当前”句柄。您可能可以创建一个宏来为插件编写者透明地处理这个问题。
最后,应该警告您,Rust 的 ABI 并不稳定。您所拥有的内容不能保证在编译器版本之间工作(或者甚至使用相同的编译器?)。它很可能大部分会起作用,但仍然不能保证。通常你会设计一个合适的 FFI(外部函数接口)层来进行通信,但是 Rust 所需的类型(Poll, Context, Box<dyn Future>)不是 FFI 安全的,所以你需要一个 FFI 安全的垫片(比如来自stable 的 shim,你应该完全检查一下) )让它真正发挥作用(尽管仍然不确定你会做什么Handle)。
而且可能还有其他东西我错过了,因为我不确定为什么handle.spawn(async { ... })不起作用。
| 归档时间: |
|
| 查看次数: |
199 次 |
| 最近记录: |