我有一个Vec我想同时执行的 future(但不一定是并行的)。基本上,我正在寻找某种select类似于tokio::select!但需要 future 集合的函数,或者相反,一种类似于futures::join_all但在第一个 future 完成后返回的函数。
另一个要求是,一旦 future 完成,我可能想向Vec.
有了这样的函数,我的代码大致如下所示:
use std::future::Future;
use std::time::Duration;
use tokio::time::sleep;
async fn wait(millis: u64) -> u64 {
sleep(Duration::from_millis(millis)).await;
millis
}
// This pseudo-implementation simply removes the last
// future and awaits it. I'm looking for something that
// instead polls all futures until one is finished, then
// removes that future from the Vec and returns it.
async fn select<F, O>(futures: &mut Vec<F>) -> …Run Code Online (Sandbox Code Playgroud) 现在我的恐慌正在被吞噬。在我的用例中,我希望它使整个程序崩溃并打印堆栈跟踪。我应该如何配置它?
use std::thread;
use tokio::task; // 0.3.4
#[tokio::main]
async fn main() {
thread::spawn(|| {
task::spawn(async {
println!("123");
});
})
.join();
}
Run Code Online (Sandbox Code Playgroud)
编译时我收到警告:
warning: unused `std::result::Result` that must be used
--> src/main.rs:6:5
|
6 | / thread::spawn(|| {
7 | | task::spawn(async {
8 | | println!("123");
9 | | });
10 | | })
11 | | .join();
| |____________^
|
= note: `#[warn(unused_must_use)]` on by default
= note: this `Result` may be an `Err` variant, which should be handled
Run Code Online (Sandbox Code Playgroud)
执行时出现错误: …
给定一个异步函数及其相应的未来,让我们说:
async fn foo() -> Result<i32, &'static str> {
// ...
}
let my_future = foo();
Run Code Online (Sandbox Code Playgroud)
除了使用 tokio::spawn().await 之外,只使用 .await 等待它有什么区别?
// like this...
let result1 = my_future.await;
// ... and like this
let result2 = tokio::spawn(my_future).await;
Run Code Online (Sandbox Code Playgroud) 我在创建此 POST 请求时收到以下错误。我是 RUST 的新手。
相反,serde_json::Value我什至尝试过HashMap<String, String>同样的问题。如果你能告诉我我的标头是否错误,或者我如何跟踪它是否实际上是网络请求问题?
这是我期待的实际回应。Session我尝试将serde_json::Value 替换为 Session仍然没有效果,错误仍然存在。
#[derive(Debug, Deserialize, Serialize)]
pub struct Session {
pub platform_type: String,
pub ticket: String,
pub profile_id: String,
pub user_id: String,
pub name_on_platform: String,
pub expiration: String, //2020-08-26T16:46:59.4772040Z
}
Run Code Online (Sandbox Code Playgroud)
我正在使用 Popos 20.04 和 Rust 1.45.2
pub async fn login(&self) -> Result<serde_json::Value, reqwest::Error> {
let response = self.client
.post(request_url)
.headers(self.construct_headers())
.basic_auth(self.email.clone(), Some(self.password.clone()))
.send().await?
.json::<serde_json::Value>().await?;
Ok(response)
}
fn construct_headers(&self) -> HeaderMap {
let mut headers = …Run Code Online (Sandbox Code Playgroud) 我正在制作一个能够使用tokio.
我通过 创建了一个任务tokio::spawn,并且我tokio::task::spawn也看到了工作。
tokio::sapwn和 和有什么区别tokio::task::spawn?
我目前正试图围绕tokio&期货原型和生态系统.
我开始从tk-http websockets示例做一些工作,并希望对接收到的数据进行更多处理,而不是回复它.第一步似乎是用某种循环替换.forward()调用.
在我看来,这stream.forward(sink)相当于stream.fold(sink, |out_, item| { out.send(item).and_then(Sink::flush) }),但是这样做(提交)流根本没有被轮询.然而,Tokio示例中的相同更改工作正常(示例).
此外,似乎首先在接收器上输出内容使得转发工作正常(提交),因此在发送发生之前,接收器可能由于某种原因没有在事件循环中注册?我错过了什么?它是否可能是tk-http中的错误?
我正在我的 Windows 系统中创建一个示例 Rust 项目,以在异步模式下通过 HTTP GET 请求下载文件。
我的代码如下(与Rust Cookbook中提到的代码相同):
extern crate error_chain;
extern crate tempfile;
extern crate tokio;
extern crate reqwest;
use error_chain::error_chain;
use std::io::copy;
use std::fs::File;
use tempfile::Builder;
error_chain! {
foreign_links {
Io(std::io::Error);
HttpRequest(reqwest::Error);
}
}
#[tokio::main]
async fn main() -> Result<()> {
let tmp_dir = Builder::new().prefix("example").tempdir()?;
let target = "https://www.rust-lang.org/logos/rust-logo-512x512.png";
let response = reqwest::get(target).await?;
let mut dest = {
let fname = response
.url()
.path_segments()
.and_then(|segments| segments.last())
.and_then(|name| if name.is_empty() { None } else { …Run Code Online (Sandbox Code Playgroud) 我将用 Rust 编写的同步套接字代码替换为使用 Tokio 的异步等效代码。Tokio 使用 future 进行异步活动,因此任务会链接在一起并排队到执行器上,由线程池执行。
我想做的基本伪代码是这样的:
let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
let in_buf = vec![0u8; 8192];
// TODO this should happen continuously until an error happens
let read_task = tokio::io::read(socket, in_buf).and_then(move |(socket, in_buf, bytes_read)| {
/* ... Logic I want to happen repeatedly as bytes are read ... */
Ok(())
};
tokio::spawn(read_task);
Ok(())
}).map_err(|err| {
error!("Accept error = {:?}", err);
});
tokio::run(server_task);
Run Code Online (Sandbox Code Playgroud)
这个伪代码只会执行我的任务一次。我如何连续运行它?我希望它执行,然后一次又一次执行等等。我只希望它在出现紧急情况或有错误结果代码时停止执行。最简单的方法是什么?
我有一个基于Tokio的 Rust 异步服务器运行时。它必须同时处理对延迟敏感的 I/O 密集型请求和大量 CPU 密集型请求。
我不想让 CPU 密集型任务垄断 Tokio 运行时并使 I/O 密集型任务饿死,所以我想将 CPU 密集型任务卸载到专用的、隔离的线程池(隔离是这里的关键,所以spawn_blocking/block_in_place在一个共享线程池上是不够的)。如何在 Tokio 中创建这样的线程池?
启动两个运行时的幼稚方法会遇到错误:
线程“tokio-runtime-worker”因“无法从运行时内启动运行时”而恐慌。发生这种情况是因为一个函数(如
block_on)试图在当前线程被用于驱动异步任务时阻塞当前线程。
use tokio; // 0.2.20
fn main() {
let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
let cpu_pool = cpu_pool.handle().clone(); // this is the fix/workaround!
main_runtime.block_on(main_runtime.spawn(async move {
cpu_pool.spawn(async {}).await
}))
.unwrap().unwrap();
}
Run Code Online (Sandbox Code Playgroud)
Tokio 可以允许两个独立的运行时吗?有没有更好的方法在 Tokio 中创建隔离的 CPU 池?
rust ×10
rust-tokio ×10
async-await ×1
asynchronous ×1
future ×1
reqwest ×1
rust-cargo ×1
threadpool ×1