如果您想直接跳到代码中,请使用 Playground 。
我正在尝试实现一个filter_con<T, F>(v: Vec<T>, predicate: F)
允许Vec
通过谓词对 ,进行并发过滤的函数async
。
也就是说,不要这样做:
let arr = vec![...];
let arr_filtered = join_all(arr.into_iter().map(|it| async move {
if some_getter(&it).await > some_value {
Some(it)
} else {
None
}
}))
.await
.into_iter()
.filter_map(|it| it)
.collect::<Vec<T>>()
Run Code Online (Sandbox Code Playgroud)
每次我需要过滤 a 时Vec
,我希望能够:
let arr = vec![...];
let arr_filtered = filter_con(arr, |it| async move {
some_getter(&it).await > some_value
}).await
Run Code Online (Sandbox Code Playgroud)
我已将该函数提取到自己的函数中,但遇到了生命周期问题
async fn filter_con<T, B, F>(arr: Vec<T>, predicate: F) -> Vec<T>
where
F: FnMut(&T) -> B,
B: futures::Future<Output = bool>,
{
join_all(arr.into_iter().map(|it| async move {
if predicate(&it).await {
Some(it)
} else {
None
}
}))
.await
.into_iter()
.filter_map(|p| p)
.collect::<Vec<_>>()
}
Run Code Online (Sandbox Code Playgroud)
error[E0507]: cannot move out of a shared reference
Run Code Online (Sandbox Code Playgroud)
我不知道我要从谓词中移出什么?
欲了解更多详情,请参阅游乐场。
您将无法将谓词设为 an FnOnce
,因为,如果您的 中有 10 个项目Vec
,则需要调用该谓词 10 次,但 anFnOnce
只保证它可以被调用一次,这可能会导致类似这样的结果:
let vec = vec![1, 2, 3];
let has_drop_impl = String::from("hello");
filter_con(vec, |&i| async {
drop(has_drop_impl);
i < 5
}
Run Code Online (Sandbox Code Playgroud)
所以F
必须是 anFnMut
或 an Fn
。标准库Iterator::filter
采用FnMut
,尽管这可能会造成混乱(需要可变引用的是闭包的捕获变量,而不是迭代器的元素)。
因为谓词是 an FnMut
,所以任何调用者都需要能够获得 an &mut F
。对于Iterator::filter
,这可以用来执行以下操作:
let vec = vec![1, 2, 3];
let mut count = 0;
vec.into_iter().filter(|&x| {
count += 1; // this line makes the closure an `FnMut`
x < 2
})
Run Code Online (Sandbox Code Playgroud)
然而,通过将迭代器发送到join_all
,您实际上允许异步运行时根据需要安排这些调用,可能同时进行,这会导致别名&mut T
,这始终是未定义的行为。这个问题有同一问题的稍微精简的版本https://github.com/rust-lang/rust/issues/69446。
我仍然不是 100% 了解细节,但编译器似乎在这里很保守,甚至不允许您首先创建闭包以防止健全性问题。
我建议让你的函数只接受Fn
s. 这样,您的运行时就可以随意调用该函数。这确实意味着您的闭包不能具有可变状态,但这在 tokio 应用程序中不太可能成为问题。对于计数示例,“正确”的解决方案是使用AtomicUsize
(或等效的),它允许通过共享引用进行突变。如果您在调用中引用可变状态filter
,它应该是线程安全的,并且线程安全数据结构通常允许通过共享引用进行突变。
鉴于该限制,以下给出了您期望的答案:
async fn filter_con<T, B, F>(arr: Vec<T>, predicate: F) -> Vec<T>
where
F: Fn(&T) -> B,
B: Future<Output = bool>,
{
join_all(arr.into_iter().map(|it| async {
if predicate(&it).await {
Some(it)
} else {
None
}
}))
.await
.into_iter()
.filter_map(|p| p)
.collect::<Vec<_>>()
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
117 次 |
最近记录: |