相关疑难解决方法(0)

如何在 Tokio 运行时上下文中从异步方法调用的非异步方法中等待 future?

我正在使用 Tokio 1.1 来做异步事情。我有一个async mainwith #[tokio::main],所以我已经在使用运行时进行操作。

main调用一个非异步方法,我希望await在未来(具体来说,我正在从数据融合数据帧收集)。这种非异步方法具有由特征规定的签名,该特征返回结构体,而不是Future<Struct>. 据我所知,我无法将其标记为异步。

如果我尝试打电话df.collect().await;,我会得到

只允许在async函数和块内部

来自编译器的错误,指出我await在其中调用的方法不是async.

如果我尝试block_on从一个新的运行时开始,未来是这样的:

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());
Run Code Online (Sandbox Code Playgroud)

我遇到运行时恐慌:

无法从运行时内部启动运行时。发生这种情况是因为函数(如block_on)试图在当前线程用于驱动异步任务时阻止该线程。

如果我尝试futures::executor::block_on(df.collect()).unwrap();,我会遇到新的运行时恐慌:

“当前未在 Tokio 0.2.x 运行时上运行。”

这很奇怪,因为我使用的是 Tokio v1.1。

这感觉比应该的更难。我处于异步上下文中,感觉编译器应该知道这一点并允许我.await从方法内调用 - 唯一的代码路径从块内调用此方法async。有没有一种简单的方法可以做到我所缺少的?

future rust async-await rust-tokio

24
推荐指数
1
解决办法
2万
查看次数

在Rust中async/await的目的是什么?

在像C#这样的语言中,给出这个代码(我没有await故意使用关键字):

async Task Foo()
{
    var task = LongRunningOperationAsync();

    // Some other non-related operation
    AnotherOperation();

    result = task.Result;
}
Run Code Online (Sandbox Code Playgroud)

在第一行中,long操作在另一个线程中运行,并Task返回a(即未来).然后,您可以执行另一个与第一个并行运行的操作,最后,您可以等待操作完成.我认为,这也是行为async/ await在Python,JavaScript等

另一方面,在Rust中,我在RFC中读到:

Rust的期货与其他语言的期货之间的根本区别在于,除非进行调查,否则Rust的期货不会做任何事情.整个系统是围绕这个建立的:例如,取消正在降低未来正是出于这个原因.相比之下,在其他语言中,调用异步fn会旋转一个立即开始执行的未来.

在这种情况下,是什么目的async/ await鲁斯特?看到其他语言,这种表示法是一种运行并行操作的便捷方式,但是如果调用async函数没有运行任何东西,我无法看到它在Rust中是如何工作的.

syntax asynchronous future rust async-await

12
推荐指数
3
解决办法
4729
查看次数

如何在 Tokio 中为 CPU 密集型工作创建专用线程池?

我有一个基于Tokio的 Rust 异步服务器运行时。它必须同时处理对延迟敏感的 I/O 密集型请求和大量 CPU 密集型请求。

我不想让 CPU 密集型任务垄断 Tokio 运行时并使 I/O 密集型任务饿死,所以我想将 CPU 密集型任务卸载到专用的、隔离的线程池(隔离是这里的关键,所以spawn_blocking/block_in_place在一个共享线程池上是不够的)。如何在 Tokio 中创建这样的线程池?

启动两个运行时的幼稚方法会遇到错误:

线程“tokio-runtime-worker”因“无法从运行时内启动运行时”而恐慌。发生这种情况是因为一个函数(如block_on)试图在当前线程被用于驱动异步任务时阻塞当前线程。

use tokio; // 0.2.20

fn main() {
    let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
    let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
    let cpu_pool = cpu_pool.handle().clone(); // this is the fix/workaround!

    main_runtime.block_on(main_runtime.spawn(async move {
        cpu_pool.spawn(async {}).await
    }))
    .unwrap().unwrap();
}
Run Code Online (Sandbox Code Playgroud)

Tokio 可以允许两个独立的运行时吗?有没有更好的方法在 Tokio 中创建隔离的 CPU 池?

threadpool rust rust-tokio

10
推荐指数
3
解决办法
3835
查看次数

什么时候应该使用 Tokio 的 `spawn_blocking`?

task文档中,有一节讨论了在异步中调用阻塞代码,以及如何避免这种情况,以免过多地阻塞异步线程(https://docs.rs/tokio/1.21.2/tokio/task/ index.html#blocking-and-yielding)。

它还讨论了用于tokio::task::spawn_blocking这些任务的方法,但我想知道在什么时候建议将工作发送到不同的线程?我目前正在编写一个程序,可以恢复大量的 ECDSA 签名,每条消息大约需要 100 微秒,同时进行大量的网络 IO。作为一个具体的例子,这足以使用类似的东西吗spawn_blocking

rust async-await rust-tokio

8
推荐指数
1
解决办法
9431
查看次数

是否应该使用同步角色,actix_web :: web :: block或futures-cpupool运行柴油?

背景

我正在通过r2d2使用柴油的actix-web应用程序上工作,并且不确定如何最好地进行异步查询。我发现了三个看似合理的选择,但不确定哪个是最好的。

潜在解决方案

同步演员

对于我来说,我可以使用actix示例,但是它非常复杂,需要大量样板来构建。我希望有一个更合理的解决方案。

Actix_web::web::block

作为另一种选择,我可以使用将actix_web::web::block查询功能包装到将来,但是我不确定这样做的性能含义。

然后,该查询是否在同一Tokio系统中运行?从我在源代码中可以找到的地方,它在基础的actix-web线程池中创建了一个线程。那是问题吗?

如果我没看错代码,则r2d2在获取连接时会阻塞其线程,这会阻塞部分核心actix-web池。与数据库查询相同。如果我执行的查询多于该池中的线程数,那么这将阻止所有actix-web?如果是这样,那就大问题了。

期货

最后,可能会有一些不必要开销的安全选择是futures-cpupool。主要问题是,这意味着要向我的项目中添加另一个板条箱,尽管我不喜欢不必要地在应用程序中浮动多个cpu池的想法。

由于r2d2和柴油都会阻塞,因此这里有令人惊讶的棘手事情。

最重要的是,不要与不使用同一r2d2池的任何事物共享此cpupool(因为创建的所有线程可能只是阻塞等待r2d2连接,在工作存在时锁定整个池)。

其次(更明显一点),因此您不应该拥有比池中线程更多的r2d2连接,反之亦然,因为更大的r2d2连接会浪费资源(连接未使用/线程不断被阻塞)(也许还有一个线程,也许更快)由OS调度程序而不是cpupool调度程序进行连接切换)。

最后,请注意您正在使用的数据库以及那里的性能。在写繁琐的sqlite应用程序中运行单个连接r2d2和池中的单个线程可能是最好的选择(尽管我会为此推荐一个合适的数据库)。

旧答案

可能有效的旧解决方案

https://www.reddit.com/r/rust/comments/axy0hp/patterns_to_scale_actixweb_and_diesel/

本质上,建议使用Futures-cpupool。

在future-rs中封装阻塞I / O的最佳方法是什么?

对于一般情况,建议使用Futures-cpupool。

无效的旧解决方案

https://www.reddit.com/r/rust/comments/9fe1ye/noob_here_can_we_talk_about_async_and_databases/

对旧的actix-web版本的一个非常好的修复。从我可以找到的请求中,不再有CPU池。

rust rust-diesel actix-web

6
推荐指数
1
解决办法
252
查看次数

如何并行运行多个调用 thread::sleep 的期货?

我有一个缓慢的未来,在运行完成之前阻塞了 1 秒。

我尝试使用join组合器,但复合未来my_app按顺序执行期货:

#![feature(pin, futures_api, arbitrary_self_types)]

extern crate futures; // v0.3

use futures::prelude::*;
use futures::task::Context;
use std::pin::PinMut;
use std::{thread, time};
use futures::executor::ThreadPoolBuilder;

struct SlowComputation {}

impl Future for SlowComputation {
    type Output = ();

    fn poll(self: PinMut<Self>, _cx: &mut Context) -> Poll<Self::Output> {
        let millis = time::Duration::from_millis(1000);
        thread::sleep(millis);

        Poll::Ready(())
    }
}

fn main() {
    let fut1 = SlowComputation {};
    let fut2 = SlowComputation {};
    let my_app = fut1.join(fut2);

    ThreadPoolBuilder::new()
        .pool_size(5)
        .create()
        .expect("Failed to create threadpool") …
Run Code Online (Sandbox Code Playgroud)

future rust

5
推荐指数
1
解决办法
2276
查看次数

如何从产生数据块的慢速处理侧线程流式传输超级请求的正文?

我有一个程序可以缓慢地生成数据(我们可以说它是计算密集型的,就像计算 pi 的数字一样)。它产生大量数据;每个响应可以是 1GiB,不适合内存,并且必须按需生成。我正在使用 hyper 编写一个 Web 服务来根据请求生成内容。

让我们跳过样板(service_fn, Server::bind)。

缓慢生成数据的 API 可能类似于

use std::io;

impl SlowData {
    fn new(initial: &str) -> SlowData {
        unimplemented!()
    }

    fn next_block(&self) -> io::Result<&[u8]> {
        unimplemented!()
    }
}

type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;

fn run(req: Request) -> ResponseFuture {
    // spawn a thread and:
    // initialize the generator
    // SlowData::new(&req.uri().path());

    // spawn a thread and call slow.next_block() until len()==0
    // each …
Run Code Online (Sandbox Code Playgroud)

rust hyper rust-tokio

5
推荐指数
1
解决办法
2622
查看次数

异步任务中止时会发生什么?

Rust 有async可以与Abortable期货相关联的方法。文档说,中止时:

未来将立即完成,没有任何进一步的进展。

绑定到未来的任务所拥有的变量会被删除吗?如果这些变量实现了drop,会drop被调用吗?如果未来产生了其他未来,它们是否会在一个链中流产?

例如:在下面的代码片段中,我没有看到为中止的任务发生析构函数,但我不知道它是否未被调用或发生在未显示打印的单独线程中。

use futures::executor::block_on;
use futures::future::{AbortHandle, Abortable};

struct S {
    i: i32,
}

impl Drop for S {
    fn drop(&mut self) {
        println!("dropping S");
    }
}

async fn f() -> i32 {
    let s = S { i: 42 };
    std::thread::sleep(std::time::Duration::from_secs(2));
    s.i
}

fn main() {
    println!("first test...");
    let (abort_handle, abort_registration) = AbortHandle::new_pair();
    let _ = Abortable::new(f(), abort_registration);
    abort_handle.abort();
    std::thread::sleep(std::time::Duration::from_secs(1));

    println!("second test...");
    let …
Run Code Online (Sandbox Code Playgroud)

asynchronous future rust

5
推荐指数
1
解决办法
1147
查看次数

为什么Future :: select首先选择睡眠时间较长的未来?

我试图理解Future::select:在这个例子中,首先返回具有较长时间延迟的未来.

当我通过它的例子阅读这篇文章时,我得到了认知失调.作者写道:

select函数运行两个(或者更多select_all)期货,并返回第一个完成.这对于实现超时很有用.

看来我不明白这种感觉select.

extern crate futures;
extern crate tokio_core;

use std::thread;
use std::time::Duration;
use futures::{Async, Future};
use tokio_core::reactor::Core;

struct Timeout {
    time: u32,
}

impl Timeout {
    fn new(period: u32) -> Timeout {
        Timeout { time: period }
    }
}

impl Future for Timeout {
    type Item = u32;
    type Error = String;

    fn poll(&mut self) -> Result<Async<u32>, Self::Error> {
        thread::sleep(Duration::from_secs(self.time as u64));
        println!("Timeout is done with time {}.", …
Run Code Online (Sandbox Code Playgroud)

select future rust

4
推荐指数
1
解决办法
1195
查看次数

使Rust线程退出阻塞操作的标准方法是什么?

来自Java,我习惯于遵循以下习惯用法

while (true) {
  try {
    someBlockingOperation();
  } catch (InterruptedException e) {
    Thread.currentThread.interrupt(); // re-set the interrupted flag
    cleanup(); // whatever is necessary
    break;
  }
}
Run Code Online (Sandbox Code Playgroud)

据我所知,它可以在整个JDK中处理任何可能阻塞的内容,例如从文件读取,从套接字读取,从队列读取甚至对于Thread.sleep()

阅读有关如何在Rust中完成此操作的信息,我发现了许多看似特殊的解决方案,例如miotokio。我也找到ErrorKind::Interrupted并尝试ErrorKind通过发送SIGINT到线程来解决这个问题,但是线程似乎立即死亡而没有留下任何(返回)跟踪。

这是我使用的代码(注意:Rust还不很精通,因此看起来可能有些奇怪,但是可以运行):

use std::io;
use std::io::Read;
use std::thread;

pub fn main() {
    let sub_thread = thread::spawn(|| {
        let mut buffer = [0; 10];
        loop {
            let d = io::stdin().read(&mut buffer);
            println!("{:?}", d);
            let n = d.unwrap();
            if n == 0 { …
Run Code Online (Sandbox Code Playgroud)

multithreading terminate rust

4
推荐指数
1
解决办法
1023
查看次数

如何使用 Tokio 同时运行一组函数,而无需同时运行相同的函数?

我的目标是同时运行 N 个函数,但在所有函数完成之前不想生成更多函数。这是我到目前为止所拥有的

extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("Hello from task {}", i);
                    // mock delay (something blocking)
                    // thread::sleep(time::Duration::from_secs(3));
                    Command::new("sleep").arg("3").output().expect("failed to execute process");

                    Ok(())
                }));
            }
            Ok(())
        })
    .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)

我每秒生成 5 个函数,但现在我想等到所有函数完成后再生成更多函数。

根据我的理解(我的想法可能是错误的),我将Future 在另一个未来返回

extern crate tokio;
extern …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

4
推荐指数
1
解决办法
4010
查看次数

使用线程和异步/等待时如何解决“无法返回引用本地数据的值”?

我正在学习 Rust,特别是并行的多线程和异步请求。

我阅读了文档,但仍然不明白我在哪里犯了错误。我想我知道在哪里,但不知道如何解决它。

主程序.rs

use std::thread;

struct Request {
    url: String,
}

impl Request {
    fn new(name: &str) -> Request {
        Request {
            url: name.to_string(),
        }
    }

    async fn call(&self, x: &str) -> Result<(), Box<dyn std::error::Error>> {
        let resp = reqwest::get(x).await;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let requests = vec![
        Request::new("https://www.google.com/"),
        Request::new("https://www.google.com/"),
    ];
    let handles: Vec<_> = requests
        .into_iter()
        .map(|request| {
            thread::spawn(move || async {
                request.call(&request.url).await;
            })
        })
        .collect();

    for y in handles {
        println!("{:?}", y);
    } …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio reqwest

3
推荐指数
1
解决办法
1759
查看次数