重构函数时如何避免出现生命周期问题?

dop*_*umi 5 rust async-await

如果您想直接跳到代码中,请使用 Playground 。

问题

我正在尝试实现一个filter_con<T, F>(v: Vec<T>, predicate: F)允许Vec通过谓词对 ,进行并发过滤的函数async

也就是说,不要这样做:

let arr = vec![...];
let arr_filtered = join_all(arr.into_iter().map(|it| async move {
    if some_getter(&it).await > some_value {
        Some(it)
    } else {
        None
    }
}))
.await
.into_iter()
.filter_map(|it| it)
.collect::<Vec<T>>()
Run Code Online (Sandbox Code Playgroud)

每次我需要过滤 a 时Vec,我希望能够:

let arr = vec![...];
let arr_filtered = filter_con(arr, |it| async move { 
  some_getter(&it).await > some_value 
}).await
Run Code Online (Sandbox Code Playgroud)

暂定实施

我已将该函数提取到自己的函数中,但遇到了生命周期问题

async fn filter_con<T, B, F>(arr: Vec<T>, predicate: F) -> Vec<T>
where
    F: FnMut(&T) -> B,
    B: futures::Future<Output = bool>,
{
    join_all(arr.into_iter().map(|it| async move {
        if predicate(&it).await {
            Some(it)
        } else {
            None
        }
    }))
    .await
    .into_iter()
    .filter_map(|p| p)
    .collect::<Vec<_>>()
}
Run Code Online (Sandbox Code Playgroud)
error[E0507]: cannot move out of a shared reference
Run Code Online (Sandbox Code Playgroud)

我不知道我要从谓词中移出什么?

欲了解更多详情,请参阅游乐场

cam*_*024 5

您将无法将谓词设为 an FnOnce,因为,如果您的 中有 10 个项目Vec,则需要调用该谓词 10 次,但 anFnOnce只保证它可以被调用一次,这可能会导致类似这样的结果:

let vec = vec![1, 2, 3];
let has_drop_impl = String::from("hello");

filter_con(vec, |&i| async {
  drop(has_drop_impl);
  i < 5
}
Run Code Online (Sandbox Code Playgroud)

所以F必须是 anFnMut或 an Fn。标准库Iterator::filter采用FnMut,尽管这可能会造成混乱(需要可变引用的是闭的捕获变量,而不是迭代器的元素)。

因为谓词是 an FnMut,所以任何调用者都需要能够获得 an &mut F。对于Iterator::filter,这可以用来执行以下操作:

let vec = vec![1, 2, 3];
let mut count = 0;

vec.into_iter().filter(|&x| {
  count += 1;  // this line makes the closure an `FnMut`
  x < 2
})
Run Code Online (Sandbox Code Playgroud)

然而,通过将迭代器发送到join_all,您实际上允许异步运行时根据需要安排这些调用,可能同时进行,这会导致别名&mut T,这始终是未定义的行为。这个问题有同一问题的稍微精简的版本https://github.com/rust-lang/rust/issues/69446

我仍然不是 100% 了解细节,但编译器似乎在这里很保守,甚至不允许您首先创建闭包以防止健全性问题。

我建议让你的函数只接受Fns. 这样,您的运行时就可以随意调用该函数。这确实意味着您的闭包不能具有可变状态,但这在 tokio 应用程序中不太可能成为问题。对于计数示例,“正确”的解决方案是使用AtomicUsize(或等效的),它允许通过共享引用进行突变。如果您在调用中引用可变状态filter,它应该是线程安全的,并且线程安全数据结构通常允许通过共享引用进行突变。

鉴于该限制,以下给出了您期望的答案:

async fn filter_con<T, B, F>(arr: Vec<T>, predicate: F) -> Vec<T>
where
    F: Fn(&T) -> B,
    B: Future<Output = bool>,
{
    join_all(arr.into_iter().map(|it| async {
        if predicate(&it).await {
            Some(it)
        } else {
            None
        }
    }))
    .await
    .into_iter()
    .filter_map(|p| p)
    .collect::<Vec<_>>()
}
Run Code Online (Sandbox Code Playgroud)

操场