Rust 中的库插件管理器现在可行吗?

Fra*_*zzi 4 ffi rust async-await

我在想我的想法是否可以实现:一个完全支持异步 Rust 的动态库插件管理器。

实现分为三个主要部分(github repo):

  1. my-interface,一个async_trait(具有“正常”特征的方法似乎工作正常,不知道到什么程度)

    use async_trait::async_trait;
    #[async_trait]
    pub trait SayHelloService {
        async fn say_hello(&self);
    }
    
    Run Code Online (Sandbox Code Playgroud)
  2. my-master,加载.dll/.so并从库中调用一个创建函数,给定 a tokio::runtime::Handle,返回一个Box<dyn SayHelloService>

    use my_interface::SayHelloService;
    use tokio::{self, runtime::Handle};
    
    #[tokio::main]
    async fn main() {
        let lib = libloading::Library::new("target/debug/libmy_plugin.so").expect("load library");
        let new_service: libloading::Symbol<fn(Handle) -> Box<dyn SayHelloService>> =
            unsafe { lib.get(b"new_service") }.expect("load symbol");
        let service1 = new_service(Handle::current());
        let service2 = new_service(Handle::current());
        let _ = tokio::join!(service1.say_hello(), service2.say_hello());
    }
    
    Run Code Online (Sandbox Code Playgroud)
  3. my-plugin,实现SayHelloService,这是阻止我的代码

    use async_trait::async_trait;
    use my_interface::SayHelloService;
    use tokio::{self, runtime::Handle};
    
    #[no_mangle]
    pub fn new_service(handle: Handle) -> Box<dyn SayHelloService> {
        Box::new(PluginSayHello::new(handle))
    }
    
    pub struct PluginSayHello {
        id: String,
        handle: Handle,
    }
    
    impl PluginSayHello {
        fn new(handle: Handle) -> PluginSayHello {
            let id = format!("{:08x}", rand::random::<u32>());
            println!("[{}] Created instance!", id);
            PluginSayHello { id, handle }
        }
    }
    
    #[async_trait]
    impl SayHelloService for PluginSayHello {
        // this errors with "future cannot be sent between threads safely"
        async fn say_hello(&self) {
            // this should enable you to call tokio::sleep but EnterGuard is not Send :(
            // https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.enter
            let _guard = self.handle.enter();
            println!("[{}] Hello from plugin!", self.id);
            let _ = tokio::spawn(async move {
                let _ = tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                println!("sleep 1");
            })
            .await;
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)

无法调用self.handle.enter()会产生各种奇怪的行为,例如:(
如果您想要更多信息,我将附上崩溃日志)

#[async_trait]
impl SayHelloService for PluginSayHello {
    async fn say_hello(&self) {
        let id = self.id.clone();
        let _ = self
            .handle
            .spawn_blocking(move || {
                println!("[{}] Hello from plugin!", id);
                // internal code of reqwest just crashes
                let body = reqwest::get("https://www.rust-lang.org")
                    .await?
                    .text()
                    .await?;
                println!("body = {:?}", body);
            })
            .await;
    }
}
Run Code Online (Sandbox Code Playgroud)

我还实现了 的工作实现PluginSayHello,但对我来说这并不是完全的胜利。

#[async_trait]
impl SayHelloService for PluginSayHello {
    async fn say_hello(&self) {
        let id = self.id.clone();
        let _ = self
            .handle
            .spawn(async move {
                println!("[{}] Hello from plugin!", id);
                // calling tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                // errors with "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
                let _ = sleep(Duration::new(1, 0));
                println!("slept 1");
                println!("[{}] Hello again from plugin!", id);
            })
            .await;
    }
}
Run Code Online (Sandbox Code Playgroud)

总结一下:

  • 这种方法在 Rust 中是否可行?
  • 你发现我的逻辑有哪些错误?(我很确定有很多)
  • 您能给我指出一个解决方案或完全不同的方法吗?

kmd*_*eko 5

这是一个看似不平凡的场景。

首先让我们退后一步,解释一下为什么事情是这样的。I/O 和计时器的异步原语在一起管理时效果最佳,因为需要一些线程处理操作系统事件。TokioRuntime当然提供了这一点,但为了避免手动传递句柄以向运行时注册计时器和 I/O 调用,有一个线程本地“当前”变量,用于存储正在执行当前任务的运行时的句柄。因此,当您执行 Tokio 特定的操作时,例如tokio::time::sleep它将使用该线程局部变量来访问运行时的时间管理器。如果没有“当前”,您会收到“没有反应堆正在运行”错误。

这成为您的插件系统中的一个问题,因为tokio将在每个编译的二进制文件(主文件和插件)中静态链接,这意味着它们将具有不同的线程局部变量:即插件中的“当前”内容与插件中的“当前”内容不同对于主服务器来说是“当前”的,即使使用相同的线程也是如此。因此,看来您已尝试手动传递句柄并使用,.enter()但此解决方案不太正确。

您遇到的一个问题是,这会导致您的异步函数出现!Send,如果您不需要它们,则可以通过Send使用来解决#[async_trait(?Send)],但我假设您需要。另一个问题是,如果任务可以在线程之间移动,则仅设置当前线程的handle.enter()当前运行时间,并且您的任务可能在某个时刻移动到不同的线程。

您需要的是让您的任务仅在执行时才async具有“当前”运行时。您可以使用包装器来做到这一点:Future

use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;

use tokio::runtime::Handle;
use pin_project_lite::pin_project;

pin_project! {
    struct WithRuntime<F> {
        runtime: Handle,
        #[pin]
        fut: F,
    }
}

impl<F> WithRuntime<F> {
    fn new(runtime: Handle, fut: F) -> Self {
        Self { runtime, fut }
    }
}

impl<F> Future for WithRuntime<F>
where
    F: Future
{
    type Output = F::Output;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let _guard = this.runtime.enter();
        this.fut.poll(ctx)
    }
}
Run Code Online (Sandbox Code Playgroud)

然后你可以使用它WithRuntime::new(handle, async { ... }).await(完整代码在这里)。将它与你当前的特质相结合的确切方法可能会很尴尬,但我认为无论如何它都需要改变。

当前设置的另一个问题是您假设一个运行时将始终调用您的服务。这可能不是真的;有像 Actix-Web 中那样的配置,其中运行多个单线程运行时。我认为您的代码仍然可以工作,但可能不是最佳的。因此,您可能应该在每个异步方法调用上传递“当前”句柄。您可能可以创建一个宏来为插件编写者透明地处理这个问题。

最后,应该警告您,Rust 的 ABI 并不稳定。您所拥有的内容不能保证在编译器版本之间工作(或者甚至使用相同的编译器?)。它很可能大部分起作用,但仍然不能保证。通常你会设计一个合适的 FFI(外部函数接口)层来进行通信,但是 Rust 所需的类型(Poll, Context, Box<dyn Future>)不是 FFI 安全的,所以你需要一个 FFI 安全的垫片(比如来自stable 的 shim,你应该完全检查一下) )让它真正发挥作用(尽管仍然不确定你会做什么Handle)。

而且可能还有其他东西我错过了,因为我不确定为什么handle.spawn(async { ... })不起作用。