Ram*_*sen 2 asynchronous future mongodb rust
run_transaction我正在尝试为 Rust MongoDB 驱动程序编写一个简单的函数
该函数尝试通过 mongo db 客户端执行事务,并在遇到可重试错误时重试该事务
这是该函数的最小可重现示例。
use mongodb::{Client, Collection, ClientSession};
use mongodb::bson::Document;
use std::future::Future;
pub enum Never {}
fn main() {
run_transaction(|mut session| async move {
let document = collection().find_one_with_session(None, None, &mut session).await?.unwrap();
let r: Result<Document, TransactionError<Never>> = Ok(document);
return r;
});
}
fn collection() -> Collection<Document> {
unimplemented!();
}
fn client() -> Client {
unimplemented!();
}
pub enum TransactionError<E> {
Mongodb(mongodb::error::Error),
Custom(E)
}
impl<T> From<mongodb::error::Error> for TransactionError<T> {
fn from(e: mongodb::error::Error) -> Self {
TransactionError::Mongodb(e)
}
}
// declaration
pub async fn run_transaction<T, E, F, Fut>(f: F) -> Result<T, TransactionError<E>>
where for<'a>
F: Fn(&'a mut ClientSession) -> Fut + 'a,
Fut: Future<Output = Result<T, TransactionError<E>>> {
let mut session = client().start_session(None).await?;
session.start_transaction(None).await?;
'run: loop {
let r = f(&mut session).await;
match r {
Err(e) => match e {
TransactionError::Custom(e) => return Err(TransactionError::Custom(e)),
TransactionError::Mongodb(e) => {
if !e.contains_label(mongodb::error::TRANSIENT_TRANSACTION_ERROR) {
return Err(TransactionError::Mongodb(e));
} else {
continue 'run;
}
}
},
Ok(v) => {
'commit: loop {
match session.commit_transaction().await {
Ok(()) => return Ok(v),
Err(e) => {
if e.contains_label(mongodb::error::UNKNOWN_TRANSACTION_COMMIT_RESULT) {
continue 'commit;
} else {
return Err(TransactionError::Mongodb(e))
}
}
}
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
但借用检查员不断抱怨此消息:
error: lifetime may not live long enough
--> src/main.rs:8:35
|
8 | run_transaction(|mut session| async move {
| ______________________------------_^
| | | |
| | | return type of closure `impl Future` contains a lifetime `'2`
| | has type `&'1 mut ClientSession`
9 | | let document = collection().find_one_with_session(None, None, &mut session).await?.unwrap();
10 | | let r: Result<Document, TransactionError<Never>> = Ok(document);
11 | | return r;
12 | | });
| |_____^ returning this value requires that `'1` must outlive `'2`
Run Code Online (Sandbox Code Playgroud)
我有办法解决这个问题吗?
你真正需要的是这样的:
pub async fn run_transaction<T, E, F, Fut>(f: F) -> Result<T, TransactionError<E>>
where
for<'a>
F: Fn(&'a mut ClientSession) -> Fut,
Fut: Future<Output = Result<T, TransactionError<E>>> + 'a {
Run Code Online (Sandbox Code Playgroud)
不幸的是,这不起作用,因为 中定义的“更高等级特征界限”(HRTB)for<'a>仅适用于下一个界限,而不是每个界限,并且没有办法连接两个生命周期......
但并不是一切都失去了!我在 Rust 支持论坛中发现了这个问题,它也有类似的问题,可以根据您的问题进行调整。基本思想是创建一个具有相同生命周期的包含Fn和边界的特征:Future
pub trait XFn<'a, I: 'a, O> {
type Output: Future<Output = O> + 'a;
fn call(&self, session: I) -> Self::Output;
}
impl<'a, I: 'a, O, F, Fut> XFn<'a, I, O> for F
where
F: Fn(I) -> Fut,
Fut: Future<Output = O> + 'a,
{
type Output = Fut;
fn call(&self, x: I) -> Fut {
self(x)
}
}
Run Code Online (Sandbox Code Playgroud)
现在你的绑定函数很简单:
pub async fn run_transaction<T, E, F>(f: F) -> Result<T, TransactionError<E>>
where for<'a>
F: XFn<'a, &'a mut ClientSession, Result<T, TransactionError<E>>>
Run Code Online (Sandbox Code Playgroud)
请记住,要调用您必须编写的函数f.call(&mut session)。
不幸的是,对 的调用run_transaction()无法编译,说明 的实现FnOnce不够通用。async move我认为这是异步闭包不稳定的限制/错误。但您可以使用适当的异步函数:
async fn do_the_thing(session: &mut ClientSession) -> Result<Document, TransactionError<Never>> {
let document = collection().find_one_with_session(None, None, session).await?.unwrap();
let r: Result<Document, TransactionError<Never>> = Ok(document);
return r;
}
run_transaction(do_the_thing).await;
Run Code Online (Sandbox Code Playgroud)
如果您认为这太复杂,并且不介意支付非常小的运行时价格,那么还有一个更简单的选择:您可以将返回的 future 装箱,完全避免第二个泛型:
pub async fn run_transaction<T, E, F>(f: F) -> Result<T, TransactionError<E>>
where for<'a>
F: Fn(&'a mut ClientSession) -> Pin<Box<dyn Future<Output = Result<T, TransactionError<E>>> + 'a>>
Run Code Online (Sandbox Code Playgroud)
然后,调用它:
run_transaction(|mut session| Box::pin(async move {
let document = collection().find_one_with_session(None, None, session).await?.unwrap();
let r: Result<Document, TransactionError<Never>> = Ok(document);
return r;
}));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
961 次 |
| 最近记录: |