标签: rust-futures

Rust 异步删除

我面临着一种情况,我需要从对象的放置处理程序运行异步代码。整个应用程序在 tokio 异步上下文中运行,因此我知道 drop 处理程序是使用活动的 tokio 运行时调用的,但不幸的是 drop 本身是一个同步函数。

理想情况下,我想要一个适用于多线程和当前线程运行时的解决方案,但如果不存在,那么我可以使用阻止删除线程并依赖其他线程来驱动的解决方案期货。

我考虑了多种选择,但我不确定哪种方法最好,也不了解它们的权衡。对于这些示例,我们假设我的类有一个async terminate(&mut self)我希望从 调用的函数drop()

struct MyClass;
impl MyClass {
    async fn terminate(&mut self) {}
}
Run Code Online (Sandbox Code Playgroud)

选项1:tokio::runtime::Handle::block_on

impl Drop for MyClass {
    fn drop(&mut self) {
        tokio::runtime::Handle::current().block_on(self.terminate());
    }
}
Run Code Online (Sandbox Code Playgroud)

这似乎是最直接的方法,但不幸的是它会引起恐慌

Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
Run Code Online (Sandbox Code Playgroud)

操场

我对此有点困惑,因为我认为Handle::block_on会使用当前正在运行的运行时,但它似乎试图启动一个新的运行时?这里发生了什么? …

asynchronous rust async-await rust-tokio rust-futures

23
推荐指数
1
解决办法
4688
查看次数

采用异步闭包的函数,该闭包采用引用并通过引用捕获

我想做这样的事情:


// NOTE: This doesn't compile

struct A { v: u32 }

async fn foo<
    C: for<'a> FnOnce(&'a A) -> Pin<Box<dyn Future<Output = ()> + 'a>>
>(c: C) {
    c(&A {
        v: 8,
    }).await
}

#[tokio::main]
async fn main() {
    let t = 9;
    foo(|a| async {
        println!("{} {}", t, a.v);
    }.boxed_local()).await;
}
Run Code Online (Sandbox Code Playgroud)

函数foo采用“异步闭包”,为其提供引用,并且“异步闭包”也允许通过引用捕获事物。上面的编译状态t的生命周期需要是“静态的”,这对我来说很有意义。

但我不确定我是否理解为什么当我将通用生命周期放在传递给它编译的“异步闭包”的引用类型上时:

struct A<'a> { v: u32, _phantom: std::marker::PhantomData<&'a ()> }

async fn foo<
    'b,
    C: for<'a> FnOnce(&'a A<'b>) -> …
Run Code Online (Sandbox Code Playgroud)

rust rust-futures

11
推荐指数
1
解决办法
1105
查看次数

如何在 Option::and_then 或 Option::map 闭包内使用 async/await 而不使用 OptionFuture?

我想运行类似以下代码的代码:

async fn get_user(s: &str) -> Option<User> { /* ... */ }

let user_id = Some("sessiontoken").and_then(|session_token| {
    get_user(session_token)
        .await // <- error
        .map(|user| user.id)
});
Run Code Online (Sandbox Code Playgroud)

但它不起作用,因为await只能在异步块或函数内部使用。另一方面,如果我使用异步块,则闭包将返回,impl Future<Output = Option<_>>这是不允许的,因为Option::and_then只允许Option作为返回类型。

我知道的唯一选择是使用OptionFuture,创建一个中间变量,而.map不是.and_then

use futures::future::OptionFuture;

let user: OptionFuture<_> = Some("sessiontoken")
    .map(|session_token| {
        get_user(session_token)
    })
    .into();
let user_id = user.await.flatten().map(|user| user.id);
Run Code Online (Sandbox Code Playgroud)

但这确实很麻烦,并且阻止我使用许多对选项、结果或类似操作进行操作的函数,例如Option::and_then.

难道就没有更好的办法了吗?

rust async-await rust-futures

9
推荐指数
1
解决办法
6033
查看次数

impl Stream 不能取消固定

我正在尝试使用crates_io_api. 我试图从流中获取数据,但我无法让它工作。

AsyncClient::all_crates返回一个impl Stream. 我如何从中获取数据?如果你提供代码会很有帮助。

我检查了异步书,但没有用。谢谢你。

这是我当前的代码。

use crates_io_api::{AsyncClient, Error};
use futures::stream::StreamExt;

async fn get_all(query: Option<String>) -> Result<crates_io_api::Crate, Error> {
  // Instantiate the client.
  let client = AsyncClient::new(
    "test (test@test.com)",
    std::time::Duration::from_millis(10000),
  )?;

  let stream = client.all_crates(query);

  // what should I do after?
  // ERROR: `impl Stream cannot be unpinned`
  while let Some(item) = stream.next().await {
      // ...
  }
}
Run Code Online (Sandbox Code Playgroud)

rust rust-futures

5
推荐指数
1
解决办法
166
查看次数

Rust 中的“一种类型比另一种更通用”错误,而类型相同

我有以下代码

use std::future::Future;
fn main() {
    handle(Test::my_func);
}

fn handle<Fut>(fun: for<'r> fn(&'r mut Test) -> Fut) -> bool
where
    Fut: Future<Output = ()>,
{
    true
}

struct Test {}

impl Test {
    pub async fn my_func<'r>(&'r mut self) -> () {
        ()
    }
}
Run Code Online (Sandbox Code Playgroud)

此外,您可以在Rust Playground上在线运行它。

出现以下错误:

error[E0308]: mismatched types
  --> src/main.rs:4:12
   |
4  |     handle(Test::my_func);
   |            ^^^^^^^^^^^^^ one type is more general than the other
...
17 |     pub async fn my_func<'r>(&'r mut self) -> () { …
Run Code Online (Sandbox Code Playgroud)

rust lifetime-scoping rust-async-std rust-lifetimes rust-futures

4
推荐指数
1
解决办法
118
查看次数

如何从期权中转出未来?

有没有更惯用或更漂亮的方法来在 Rust 中进行这样的操作?

let maybe_output = match maybe_input {
    Some(input) => Some(async_result(input).await?),
    None => None,
};

Run Code Online (Sandbox Code Playgroud)

我尝试map这样使用,

let maybe_output = maybe_input.map(|input| async_result(input).await?);
Run Code Online (Sandbox Code Playgroud)

但我不能在不返回 future 或结果的 lambda 中使用.awaitand运算符。?我怀疑我可以绘制地图并得到一个Option<Future<Result<_>>>,然后将其Future<Result<_>>排序Future<Result<Option<_>>>

let maybe_output = maybe_input.map(|input|
    async_result(input)
).transpose().await?;
Run Code Online (Sandbox Code Playgroud)

但我不知道是否可以调用transposeFuture,因为它是一个特征,而不是像 Result 或 Option 这样的类型。

rust async-await option-type rust-futures

4
推荐指数
1
解决办法
1629
查看次数

限制 join_all!() 中并发 future 的数量

如何让 Rust 执行所有给定的 future(例如join_all!)限制为一次执行 10 个 future?

我需要从大量服务器下载文件,但同时查询不超过 10 台服务器(以准确测量它们的超时:如果我一次查询太多服务器,它们就会超时,即使服务器本身很快)。

concurrency rust rust-tokio rust-futures

3
推荐指数
1
解决办法
2865
查看次数