我面临着一种情况,我需要从对象的放置处理程序运行异步代码。整个应用程序在 tokio 异步上下文中运行,因此我知道 drop 处理程序是使用活动的 tokio 运行时调用的,但不幸的是 drop 本身是一个同步函数。
理想情况下,我想要一个适用于多线程和当前线程运行时的解决方案,但如果不存在,那么我可以使用阻止删除线程并依赖其他线程来驱动的解决方案期货。
我考虑了多种选择,但我不确定哪种方法最好,也不了解它们的权衡。对于这些示例,我们假设我的类有一个async terminate(&mut self)我希望从 调用的函数drop()。
struct MyClass;
impl MyClass {
    async fn terminate(&mut self) {}
}
选项1:tokio::runtime::Handle::block_on
impl Drop for MyClass {
    fn drop(&mut self) {
        tokio::runtime::Handle::current().block_on(self.terminate());
    }
}
这似乎是最直接的方法,但不幸的是它会引起恐慌
Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
看操场
我对此有点困惑,因为我认为Handle::block_on会使用当前正在运行的运行时,但它似乎试图启动一个新的运行时?这里发生了什么? …
我想做这样的事情:
// NOTE: This doesn't compile
struct A { v: u32 }
async fn foo<
    C: for<'a> FnOnce(&'a A) -> Pin<Box<dyn Future<Output = ()> + 'a>>
>(c: C) {
    c(&A {
        v: 8,
    }).await
}
#[tokio::main]
async fn main() {
    let t = 9;
    foo(|a| async {
        println!("{} {}", t, a.v);
    }.boxed_local()).await;
}
函数foo采用“异步闭包”,为其提供引用,并且“异步闭包”也允许通过引用捕获事物。上面的编译状态t的生命周期需要是“静态的”,这对我来说很有意义。
但我不确定我是否理解为什么当我将通用生命周期放在传递给它编译的“异步闭包”的引用类型上时:
struct A<'a> { v: u32, _phantom: std::marker::PhantomData<&'a ()> }
async fn foo<
    'b,
    C: for<'a> FnOnce(&'a A<'b>) -> …我想运行类似以下代码的代码:
async fn get_user(s: &str) -> Option<User> { /* ... */ }
let user_id = Some("sessiontoken").and_then(|session_token| {
    get_user(session_token)
        .await // <- error
        .map(|user| user.id)
});
但它不起作用,因为await只能在异步块或函数内部使用。另一方面,如果我使用异步块,则闭包将返回,impl Future<Output = Option<_>>这是不允许的,因为Option::and_then只允许Option作为返回类型。
我知道的唯一选择是使用OptionFuture,创建一个中间变量,而.map不是.and_then。
use futures::future::OptionFuture;
let user: OptionFuture<_> = Some("sessiontoken")
    .map(|session_token| {
        get_user(session_token)
    })
    .into();
let user_id = user.await.flatten().map(|user| user.id);
但这确实很麻烦,并且阻止我使用许多对选项、结果或类似操作进行操作的函数,例如Option::and_then.
难道就没有更好的办法了吗?
我正在尝试使用crates_io_api. 我试图从流中获取数据,但我无法让它工作。
AsyncClient::all_crates返回一个impl Stream. 我如何从中获取数据?如果你提供代码会很有帮助。
我检查了异步书,但没有用。谢谢你。
这是我当前的代码。
use crates_io_api::{AsyncClient, Error};
use futures::stream::StreamExt;
async fn get_all(query: Option<String>) -> Result<crates_io_api::Crate, Error> {
  // Instantiate the client.
  let client = AsyncClient::new(
    "test (test@test.com)",
    std::time::Duration::from_millis(10000),
  )?;
  let stream = client.all_crates(query);
  // what should I do after?
  // ERROR: `impl Stream cannot be unpinned`
  while let Some(item) = stream.next().await {
      // ...
  }
}
我有以下代码
use std::future::Future;
fn main() {
    handle(Test::my_func);
}
fn handle<Fut>(fun: for<'r> fn(&'r mut Test) -> Fut) -> bool
where
    Fut: Future<Output = ()>,
{
    true
}
struct Test {}
impl Test {
    pub async fn my_func<'r>(&'r mut self) -> () {
        ()
    }
}
此外,您可以在Rust Playground上在线运行它。
出现以下错误:
error[E0308]: mismatched types
  --> src/main.rs:4:12
   |
4  |     handle(Test::my_func);
   |            ^^^^^^^^^^^^^ one type is more general than the other
...
17 |     pub async fn my_func<'r>(&'r mut self) -> () { …rust lifetime-scoping rust-async-std rust-lifetimes rust-futures
有没有更惯用或更漂亮的方法来在 Rust 中进行这样的操作?
let maybe_output = match maybe_input {
    Some(input) => Some(async_result(input).await?),
    None => None,
};
我尝试map这样使用,
let maybe_output = maybe_input.map(|input| async_result(input).await?);
但我不能在不返回 future 或结果的 lambda 中使用.awaitand运算符。?我怀疑我可以绘制地图并得到一个Option<Future<Result<_>>>,然后将其Future<Result<_>>排序Future<Result<Option<_>>>;
let maybe_output = maybe_input.map(|input|
    async_result(input)
).transpose().await?;
但我不知道是否可以调用transposeFuture,因为它是一个特征,而不是像 Result 或 Option 这样的类型。
如何让 Rust 执行所有给定的 future(例如join_all!)限制为一次执行 10 个 future?
我需要从大量服务器下载文件,但同时查询不超过 10 台服务器(以准确测量它们的超时:如果我一次查询太多服务器,它们就会超时,即使服务器本身很快)。
rust ×7
rust-futures ×7
async-await ×3
rust-tokio ×2
asynchronous ×1
concurrency ×1
option-type ×1