标签: rust-tokio

东京::选择!但对于未来的 Vec

我有一个Vec我想同时执行的 future(但不一定是并行的)。基本上,我正在寻找某种select类似于tokio::select!但需要 future 集合的函数,或者相反,一种类似于futures::join_all但在第一个 future 完成后返回的函数。

另一个要求是,一旦 future 完成,我可能想向Vec.

有了这样的函数,我的代码大致如下所示:

use std::future::Future;
use std::time::Duration;
use tokio::time::sleep;

async fn wait(millis: u64) -> u64 {
    sleep(Duration::from_millis(millis)).await;
    millis
}

// This pseudo-implementation simply removes the last
// future and awaits it. I'm looking for something that
// instead polls all futures until one is finished, then
// removes that future from the Vec and returns it.
async fn select<F, O>(futures: &mut Vec<F>) -> …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

16
推荐指数
1
解决办法
8442
查看次数

在 tokio 任务中传播恐慌的推荐方法是什么?

现在我的恐慌正在被吞噬。在我的用例中,我希望它使整个程序崩溃并打印堆栈跟踪。我应该如何配置它?

rust rust-tokio

15
推荐指数
1
解决办法
6201
查看次数

如何从 Tokio 的非主线程运行异步任务?

use std::thread;
use tokio::task; // 0.3.4

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        task::spawn(async {
            println!("123");
        });
    })
    .join();
}
Run Code Online (Sandbox Code Playgroud)

编译时我收到警告:

warning: unused `std::result::Result` that must be used
  --> src/main.rs:6:5
   |
6  | /     thread::spawn(|| {
7  | |         task::spawn(async {
8  | |             println!("123");
9  | |         });
10 | |     })
11 | |     .join();
   | |____________^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: this `Result` may be an `Err` variant, which should be handled
Run Code Online (Sandbox Code Playgroud)

执行时出现错误: …

asynchronous rust rust-tokio

12
推荐指数
2
解决办法
6008
查看次数

tokio::spawn(my_future).await 和 my_future.await 有什么区别?

给定一个异步函数及其相应的未来,让我们说:

async fn foo() -> Result<i32, &'static str> {
    // ...
}

let my_future = foo();
Run Code Online (Sandbox Code Playgroud)

除了使用 tokio::spawn().await 之外,只使用 .await 等待它有什么区别?

// like this...
let result1 = my_future.await;

// ... and like this
let result2 = tokio::spawn(my_future).await;
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

12
推荐指数
1
解决办法
2515
查看次数

reqwest::Error { kind: 解码, source: Error("期望值", 行: 1, 列: 1) }'

我在创建此 POST 请求时收到以下错误。我是 RUST 的新手。

相反,serde_json::Value我什至尝试过HashMap<String, String>同样的问题。如果你能告诉我我的标头是否错误,或者我如何跟踪它是否实际上是网络请求问题?

这是我期待的实际回应。Session我尝试将serde_json::Value 替换为 Session仍然没有效果,错误仍然存​​在。

#[derive(Debug, Deserialize, Serialize)]
pub struct Session {
    pub platform_type: String,
    pub ticket: String,
    pub profile_id: String,
    pub user_id: String,
    pub name_on_platform: String,
    pub expiration: String, //2020-08-26T16:46:59.4772040Z
}
Run Code Online (Sandbox Code Playgroud)

我正在使用 Popos 20.04 和 Rust 1.45.2

    pub async fn login(&self) -> Result<serde_json::Value, reqwest::Error> {
        let response = self.client
            .post(request_url)
            .headers(self.construct_headers())
            .basic_auth(self.email.clone(), Some(self.password.clone()))
            .send().await?
            .json::<serde_json::Value>().await?;

        Ok(response)
    }

    fn construct_headers(&self) -> HeaderMap {
        let mut headers = …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio reqwest

12
推荐指数
1
解决办法
5721
查看次数

Rust 中的 tokio::spawn 和 tokio::task::spawn 有什么区别?

我正在制作一个能够使用tokio.

我通过 创建了一个任务tokio::spawn,并且我tokio::task::spawn也看到了工作。

tokio::sapwn和 和有什么区别tokio::task::spawn

rust rust-tokio

12
推荐指数
1
解决办法
2114
查看次数

从期货转发::流到期货::汇

我目前正试图围绕tokio&期货原型和生态系统.

我开始从tk-http websockets示例做一些工作,并希望对接收到的数据进行更多处理,而不是回复它.第一步似乎是用某种循环替换.forward()调用.

在我看来,这stream.forward(sink)相当于stream.fold(sink, |out_, item| { out.send(item).and_then(Sink::flush) }),但是这样做(提交)流根本没有被轮询.然而,Tokio示例中的相同更改工作正常(示例).

此外,似乎首先在接收器上输出内容使得转发工作正常(提交),因此在发送发生之前,接收器可能由于某种原因没有在事件循环中注册?我错过了什么?它是否可能是tk-http中的错误?

rust rust-tokio

11
推荐指数
0
解决办法
699
查看次数

找不到 tokio::main 宏?

我正在我的 Windows 系统中创建一个示例 Rust 项目,以在异步模式下通过 HTTP GET 请求下载文件。

我的代码如下(与Rust Cookbook中提到的代码相同):

extern crate error_chain;
extern crate tempfile;
extern crate tokio;
extern crate reqwest;

use error_chain::error_chain;
use std::io::copy;
use std::fs::File;
use tempfile::Builder;

error_chain! {
     foreign_links {
         Io(std::io::Error);
         HttpRequest(reqwest::Error);
     }
}

#[tokio::main]
async fn main() -> Result<()> {
    let tmp_dir = Builder::new().prefix("example").tempdir()?;
    let target = "https://www.rust-lang.org/logos/rust-logo-512x512.png";
    let response = reqwest::get(target).await?;

    let mut dest = {
        let fname = response
            .url()
            .path_segments()
            .and_then(|segments| segments.last())
            .and_then(|name| if name.is_empty() { None } else { …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-cargo rust-tokio

11
推荐指数
2
解决办法
4468
查看次数

如何在 Tokio 中安排重复任务?

我将用 Rust 编写的同步套接字代码替换为使用 Tokio 的异步等效代码。Tokio 使用 future 进行异步活动,因此任务会链接在一起并排队到执行器上,由线程池执行。

我想做的基本伪代码是这样的:

let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
    let in_buf = vec![0u8; 8192];
    // TODO this should happen continuously until an error happens
    let read_task = tokio::io::read(socket, in_buf).and_then(move |(socket, in_buf, bytes_read)| {
        /* ... Logic I want to happen repeatedly as bytes are read ... */
        Ok(())
    };
    tokio::spawn(read_task);
    Ok(())
}).map_err(|err| {
    error!("Accept error = {:?}", err);
});
tokio::run(server_task);
Run Code Online (Sandbox Code Playgroud)

这个伪代码只会执行我的任务一次。我如何连续运行它?我希望它执行,然后一次又一次执行等等。我只希望它在出现紧急情况或有错误结果代码时停止执行。最简单的方法是什么?

future rust rust-tokio

10
推荐指数
1
解决办法
5075
查看次数

如何在 Tokio 中为 CPU 密集型工作创建专用线程池?

我有一个基于Tokio的 Rust 异步服务器运行时。它必须同时处理对延迟敏感的 I/O 密集型请求和大量 CPU 密集型请求。

我不想让 CPU 密集型任务垄断 Tokio 运行时并使 I/O 密集型任务饿死,所以我想将 CPU 密集型任务卸载到专用的、隔离的线程池(隔离是这里的关键,所以spawn_blocking/block_in_place在一个共享线程池上是不够的)。如何在 Tokio 中创建这样的线程池?

启动两个运行时的幼稚方法会遇到错误:

线程“tokio-runtime-worker”因“无法从运行时内启动运行时”而恐慌。发生这种情况是因为一个函数(如block_on)试图在当前线程被用于驱动异步任务时阻塞当前线程。

use tokio; // 0.2.20

fn main() {
    let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
    let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
    let cpu_pool = cpu_pool.handle().clone(); // this is the fix/workaround!

    main_runtime.block_on(main_runtime.spawn(async move {
        cpu_pool.spawn(async {}).await
    }))
    .unwrap().unwrap();
}
Run Code Online (Sandbox Code Playgroud)

Tokio 可以允许两个独立的运行时吗?有没有更好的方法在 Tokio 中创建隔离的 CPU 池?

threadpool rust rust-tokio

10
推荐指数
3
解决办法
3835
查看次数