我的目标是同时运行 N 个函数,但在所有函数完成之前不想生成更多函数。这是我到目前为止所拥有的:
extern crate tokio;
extern crate futures;
use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;
fn main() {
let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
.for_each(|interval| {
println!("Interval: {:?}", interval);
for i in 0..5 {
tokio::spawn(lazy(move || {
println!("Hello from task {}", i);
// mock delay (something blocking)
// thread::sleep(time::Duration::from_secs(3));
Command::new("sleep").arg("3").output().expect("failed to execute process");
Ok(())
}));
}
Ok(())
})
.map_err(|e| panic!("interval errored; err={:?}", e));
tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)
我每秒生成 5 个函数,但现在我想等到所有函数完成后再生成更多函数。
根据我的理解(我的想法可能是错误的),我将Future 在另一个未来返回
extern crate tokio;
extern …Run Code Online (Sandbox Code Playgroud) 我正在尝试读取文件、解密它并返回数据。因为文件可能非常大,所以我想在流中执行此操作。
我找不到一个好的模式来实现流。我正在尝试做这样的事情:
let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
if decrypted_length < start + length {
let mut encrypted_chunk = vec![0u8; encrypted_block_size];
match f.read(&mut encrypted_chunk[..]) {
Ok(size) => {
if size > 0 {
let decrypted = my_decrypt_fn(&encrypted_chunk[..]);
let updated_decrypted_length = decrypted_length + decrypted.len();
Some((decrypted, updated_decrypted_length))
} else {
None
}
}
Err(e) => {
println!("Error {}", e);
None
}
}
} else {
None
}
});
Run Code Online (Sandbox Code Playgroud)
问题是f.read上面的异步闭包中不允许这样做,并出现以下错误:
let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
if …Run Code Online (Sandbox Code Playgroud) 目前我可以使用以下代码,但我不想在 postgres 查询中将 JSON 转换为文本,因为它会增加延迟。
async fn filter_data(min : f32, max : f32, pool: &Pool) -> Result<String, PoolError> {
let client: Client = pool.get().await?;
let sql = format!("select \"json\"::TEXT from get_data({}, {})", min, max);
let stmt = client.prepare(&sql).await?;
let rows = client.query(&stmt, &[]).await?;
Ok(rows[0].get(0))
}
Run Code Online (Sandbox Code Playgroud)
如果我不将 JSON 转换为文本,则会收到以下错误:
error retrieving column 0: error deserializing column 0: cannot convert between the Rust type `alloc::string::String` and the Postgres type `jsonb`
Run Code Online (Sandbox Code Playgroud)
可以使用什么类型以便我返回该 json 值而不将其转换为文本?
目前我正在编写一个纯 Rust MQTT5 库(我知道那里有现有的库,但我更想学习 Rust),我偶然发现了这个问题。
我正在使用最新的稳定版 rust 和 tokio 1.0.1。
当我通过线路发送数据包时,我经常期望服务器做出响应(下面的示例是 PingReq/PingAck、Ping/Pong)。
省略了很多有关超时和数据包冲突的逻辑,我用 JavaScript 编写了逻辑的简化版本(因为我对此非常了解)。
这种逻辑将如何转化为 Rust 及其未来?或者更清楚地说:我可以以某种方式重新创建awaitPackage + onIncomingPacket 的resolve() 回调函数行为吗?
class Client {
awaitedPacketTypes = {};
/**
* a ping consist of a send ping and a receive pong
*/
async ping(){
await this.sendPacket("Ping");
return await this.awaitPackage("Pong");
}
async sendPacket(packetType) { /*...*/ }
/**
* This expects a specific packet type to be received in the future
* @param {*} packetType
*/
awaitPackage(packetType) {
return new Promise((resolve, reject) …Run Code Online (Sandbox Code Playgroud) 该rng板条箱既不是Sync也不是Send,因此 rng::thread_rng 不能跨越 .await 点。在异步 Rust 中生成随机数的最简单且美观的方法是什么?
预先生成大量数字然后使用它们是丑陋的。
我正在尝试 tokio 的结果tokio::spawn,tokio::task::spawn结果我不明白后者的行为方式。
当我运行以下代码时:
#[tokio::main]
pub async fn main() {
// I'm spawning one block of functions
let h = tokio::task::spawn_blocking(move || {
block_one();
});
// and another block of functions
let h2 = tokio::spawn(async move {
block_two().await;
});
// then I collect the handles
h.await.unwrap();
h2.await.unwrap();
}
#[tokio::main] //needed as this block is not treated as syncronous by main
pub async fn block_one() {
let mut handles = vec![];
for i in 1..10 {
let …Run Code Online (Sandbox Code Playgroud) 我正在尝试重现 Shepmasters 对这个问题的回答,但出现以下编译错误。
error[E0599]: the method `for_each` exists for struct `tokio::io::Lines<tokio::io::BufReader<tokio::process::ChildStdout>>`, but its trait bounds were not satisfied
--> src/main.rs:19:10
|
19 | .for_each(|s| async move { println!("> {:?}", s) })
| ^^^^^^^^ method cannot be called on `tokio::io::Lines<tokio::io::BufReader<tokio::process::ChildStdout>>` due to unsatisfied trait bounds
|
::: /home/.../tokio-1.7.1/src/io/util/lines.rs:10:1
|
10 | / pin_project! {
11 | | /// Read lines from an [`AsyncBufRead`].
12 | | ///
13 | | /// A `Lines` can be turned into a `Stream` …Run Code Online (Sandbox Code Playgroud) 我无法理解tokio::try_run!返回tokio::spawn. Err当我运行以下示例时:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let h1 = tokio::spawn(async {
sleep(Duration::from_millis(100)).await;
// 1/0; commented for now
let v: Result<i32, ()> = Err(());
v
});
let h2 = tokio::spawn(async {
sleep(Duration::from_millis(500)).await;
println!("h2 didn't get canceled");
let v: Result<i32, ()> = Ok(2);
v
});
match tokio::try_join!(h1, h2) {
Ok((first, second)) => {
println!("try_join was successful, got {:?} and {:?}", first, second);
}
Err(err) => {
println!("try_join had an error: {:?}", err);
} …Run Code Online (Sandbox Code Playgroud) 我知道 Axum 是建立在 Tokio 之上的,并且 Tokio 有一个多线程调度程序和当前线程调度程序。
是否可以设置运行时以使其在单线程中服务请求?
我想使用Hyper的当前主分支编写服务器,该分支保存由POST请求传递的消息,并将此消息发送到每个传入的GET请求.
我有这个,大多是从Hyper示例目录中复制的:
extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;
use futures::future::FutureResult;
use hyper::{Get, Post, StatusCode};
use hyper::header::{ContentLength};
use hyper::server::{Http, Service, Request, Response};
use futures::Stream;
struct Echo {
data: Vec<u8>,
}
impl Echo {
fn new() -> Self {
Echo {
data: "text".into(),
}
}
}
impl Service for Echo {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = FutureResult<Response, hyper::Error>;
fn call(&self, req: Self::Request) -> Self::Future {
let resp = match …Run Code Online (Sandbox Code Playgroud)