gar*_*ing 1 daemon rust rust-actix actix-web
给定一个带有 Actix的WebSocket 服务器的基本设置,我如何在我的消息处理程序中启动一个守护进程?
我已经扩展了上面链接的示例启动代码以daemon(false, true)使用fork crate进行调用。
use actix::{Actor, StreamHandler};
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use fork::{daemon, Fork};
/// Define HTTP actor
struct MyWs;
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
}
/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
println!("text message received");
if let Ok(Fork::Child) = daemon(false, true) {
println!("from daemon: this print but then the websocket crashes!");
};
ctx.text(text)
},
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
_ => (),
}
}
}
async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let resp = ws::start(MyWs {}, &req, stream);
println!("{:?}", resp);
resp
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().route("/ws/", web::get().to(index)))
.bind("127.0.0.1:8080")?
.run()
.await
}
Run Code Online (Sandbox Code Playgroud)
上面的代码启动了服务器,但是当我向它发送消息时,我收到了一个Panic in Arbiter thread.
text message received
from daemon: this print but then the websocket crashes!
thread 'actix-rt:worker:0' panicked at 'failed to park', /Users/xxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.25/src/runtime/basic_scheduler.rs:158:56
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Panic in Arbiter thread.
Run Code Online (Sandbox Code Playgroud)
您的应用程序的问题在于 actix-web 运行时(即 Tokio)是多线程的。这是一个问题,因为fork()调用(由 内部使用daemon())仅复制调用fork().
即使你的父进程有 N 个线程,你的子进程也只有 1 个。如果你的父进程有任何被这些线程锁定的互斥锁,它们的状态将在子进程中复制,但由于这些线程不存在那里,它们将永远保持锁定状态。
如果你有一个Rc/Arc它永远不会取消分配它的内存,因为它永远不会被删除,因此它的内部计数永远不会达到零。这同样适用于任何指针和共享状态。
或者更简单地说 -你分叉的孩子最终会处于未定义状态。
这在Calling fork() in a Multithreaded Environment 中得到了最好的解释:
fork() 系统调用创建了调用它的地址空间的精确副本,导致两个地址空间执行相同的代码。如果在 fork() 时分叉地址空间有多个线程在执行,则可能会出现问题。当多线程是库调用的结果时,线程不一定知道彼此的存在、目的、操作等。假设其他线程之一(除执行 fork( ) 的线程之外的任何线程)负责从您的支票帐户中扣款。显然,您不希望这种情况因其他线程决定调用 fork() 而发生两次。
由于这些类型的问题,通常是线程修改持久状态的问题,POSIX 定义了 fork() 在线程存在的情况下的行为,以仅传播分叉线程。这解决了对持久状态进行不当更改的问题。但是,它会导致其他问题,如下一段所述。
在 POSIX 模型中,仅传播分叉线程。所有其他线程都被淘汰,恕不另行通知;不发送取消,也不运行处理程序。然而,地址空间的所有其他部分都被克隆,包括所有互斥状态。如果其他线程锁定了互斥锁,则该互斥锁将在子进程中被锁定,但锁所有者将不存在来解锁它。因此,被锁保护的资源将永久不可用。
在这里,您可以找到更可靠的来源,并提供更多详细信息
“如何在我的消息处理程序中启动守护进程?”
我假设您想在 accept()模型上实现经典的 unix “fork()”模型。在这种情况下,你就不走运了,因为诸如 actix-web 和 async/await 之类的服务器一般都没有考虑到这一点。即使你有一个单线程的 async/await 服务器,那么:
当一个孩子被分叉时,它会从父级继承所有文件描述符。所以在 fork 之后,孩子关闭它的监听套接字以避免资源泄漏是很常见的 - 但是在任何基于 async/await 的服务器上都没有办法做到这一点,不是因为它不可能做到,而是因为它是未实现。
更重要的原因是为了防止子进程接受新的连接——因为即使你运行一个单线程服务器,它仍然能够同时处理许多任务——即当你的处理程序调用.await某些东西时,接受者将是自由地接受一个新连接(通过从套接字的队列中窃取它)并开始处理它。
你的父服务器可能已经产生了很多任务,这些任务会在每个分叉的孩子中复制,因此在每个进程中独立地多次执行相同的事情
嗯……在我熟悉的任何基于 async/await 的服务器上,都无法阻止任何这种情况。您需要一个自定义服务器:
换句话说 - async/await 和“fork() on accept()”是用于并发处理任务的两种不同 且不兼容的模型。
一个可能的解决方案是拥有一个非异步接受器守护进程,它只接受连接和分叉本身。然后在孩子中产生一个网络服务器,然后将接受的套接字提供给它。但是尽管可能,但目前没有任何服务器支持。