标签: rust-tokio

如何使用 Tokio 同时运行一组函数,而无需同时运行相同的函数?

我的目标是同时运行 N 个函数,但在所有函数完成之前不想生成更多函数。这是我到目前为止所拥有的

extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("Hello from task {}", i);
                    // mock delay (something blocking)
                    // thread::sleep(time::Duration::from_secs(3));
                    Command::new("sleep").arg("3").output().expect("failed to execute process");

                    Ok(())
                }));
            }
            Ok(())
        })
    .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)

我每秒生成 5 个函数,但现在我想等到所有函数完成后再生成更多函数。

根据我的理解(我的想法可能是错误的),我将Future 在另一个未来返回

extern crate tokio;
extern …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

4
推荐指数
1
解决办法
4010
查看次数

如何通过读取和转换文件来创建流?

我正在尝试读取文件、解密它并返回数据。因为文件可能非常大,所以我想在流中执行此操作。

我找不到一个好的模式来实现流。我正在尝试做这样的事情:

let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
    if decrypted_length < start + length {
        let mut encrypted_chunk = vec![0u8; encrypted_block_size];
        match f.read(&mut encrypted_chunk[..]) {
            Ok(size) => {
                if size > 0 {
                    let decrypted = my_decrypt_fn(&encrypted_chunk[..]);
                    let updated_decrypted_length = decrypted_length + decrypted.len();
                    Some((decrypted, updated_decrypted_length))
                } else {
                    None
                }
            }
            Err(e) => {
                println!("Error {}", e);
                None
            }
        }
    } else {
        None
    }
});
Run Code Online (Sandbox Code Playgroud)

问题是f.read上面的异步闭包中不允许这样做,并出现以下错误:

let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
    if …
Run Code Online (Sandbox Code Playgroud)

rust hyper rust-tokio

4
推荐指数
1
解决办法
9958
查看次数

Rust 使用 Postgres JSON 属性:无法在 Rust 类型 `alloc::string::String` 和 Postgres 类型 `jsonb` 之间进行转换

目前我可以使用以下代码,但我不想在 postgres 查询中将 JSON 转换为文本,因为它会增加延迟。

async fn filter_data(min : f32, max : f32, pool: &Pool) -> Result<String, PoolError> {
    let client: Client = pool.get().await?;
    let sql = format!("select \"json\"::TEXT from get_data({}, {})", min, max);
    let stmt = client.prepare(&sql).await?;
    let rows = client.query(&stmt, &[]).await?;
    Ok(rows[0].get(0))
}
Run Code Online (Sandbox Code Playgroud)

如果我不将 JSON 转换为文本,则会收到以下错误:

error retrieving column 0: error deserializing column 0: cannot convert between the Rust type `alloc::string::String` and the Postgres type `jsonb`
Run Code Online (Sandbox Code Playgroud)

可以使用什么类型以便我返回该 json 值而不将其转换为文本?

postgresql json rust rust-tokio

4
推荐指数
1
解决办法
3898
查看次数

如何将 JS Promise 翻译为 Rust

目前我正在编写一个纯 Rust MQTT5 库(我知道那里有现有的库,但我更想学习 Rust),我偶然发现了这个问题。

我正在使用最新的稳定版 rust 和 tokio 1.0.1。

当我通过线路发送数据包时,我经常期望服务器做出响应(下面的示例是 PingReq/PingAck、Ping/Pong)。

省略了很多有关超时和数据包冲突的逻辑,我用 JavaScript 编写了逻辑的简化版本(因为我对此非常了解)。

这种逻辑将如何转化为 Rust 及其未来?或者更清楚地说:我可以以某种方式重新创建awaitPackage + onIncomingPacket 的resolve() 回调函数行为吗?

class Client {
  awaitedPacketTypes = {};

  /**
   * a ping consist of a send ping and a receive pong
   */
  async ping(){
    await this.sendPacket("Ping");
    return await this.awaitPackage("Pong");
  }

  async sendPacket(packetType) { /*...*/ }
  
  /**
   * This expects a specific packet type to be received in the future
   * @param {*} packetType 
   */
  awaitPackage(packetType) {
    return new Promise((resolve, reject) …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

4
推荐指数
1
解决办法
941
查看次数

如何在异步 Rust 中生成随机数?

rng板条箱既不是Sync也不是Send,因此 rng::thread_rng 不能跨越 .await 点。在异步 Rust 中生成随机数的最简单且美观的方法是什么?

预先生成大量数字然后使用它们是丑陋的。

rust rust-tokio

4
推荐指数
1
解决办法
4621
查看次数

意外的 tokio::task::spawn_blocking 行为

我正在尝试 tokio 的结果tokio::spawntokio::task::spawn结果我不明白后者的行为方式。

当我运行以下代码时:

#[tokio::main]
pub async fn main() {
    // I'm spawning one block of functions
    let h = tokio::task::spawn_blocking(move || {
        block_one();
    });

    // and another block of functions
    let h2 = tokio::spawn(async move {
        block_two().await;
    });

    // then I collect the handles
    h.await.unwrap();
    h2.await.unwrap();
}

#[tokio::main] //needed as this block is not treated as syncronous by main
pub async fn block_one() {
    let mut handles = vec![];

    for i in 1..10 {
        let …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

4
推荐指数
1
解决办法
4243
查看次数

不阻塞地读取 Childstdout

我正在尝试重现 Shepmasters 对这个问题的回答,但出现以下编译错误。

error[E0599]: the method `for_each` exists for struct `tokio::io::Lines<tokio::io::BufReader<tokio::process::ChildStdout>>`, but its trait bounds were not satisfied
  --> src/main.rs:19:10
   |
19 |           .for_each(|s| async move { println!("> {:?}", s) })
   |            ^^^^^^^^ method cannot be called on `tokio::io::Lines<tokio::io::BufReader<tokio::process::ChildStdout>>` due to unsatisfied trait bounds
   | 
  ::: /home/.../tokio-1.7.1/src/io/util/lines.rs:10:1
   |
10 | / pin_project! {
11 | |     /// Read lines from an [`AsyncBufRead`].
12 | |     ///
13 | |     /// A `Lines` can be turned into a `Stream` …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

4
推荐指数
1
解决办法
2024
查看次数

东京::尝试加入!当任务之一返回 Err 时不返回 Err 变体?

我无法理解tokio::try_run!返回tokio::spawn. Err当我运行以下示例时:

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let h1 = tokio::spawn(async {
        sleep(Duration::from_millis(100)).await;
        // 1/0; commented for now
        let v: Result<i32, ()> = Err(());
        v
    });

    let h2 = tokio::spawn(async {
        sleep(Duration::from_millis(500)).await;
        println!("h2 didn't get canceled");
        let v: Result<i32, ()> = Ok(2);
        v
    });

    match tokio::try_join!(h1, h2) {
        Ok((first, second)) => {
            println!("try_join was successful, got {:?} and {:?}", first, second);
        }
        Err(err) => {
            println!("try_join had an error: {:?}", err);
        } …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio

4
推荐指数
1
解决办法
1562
查看次数

是否可以在单线程上运行 Axum?

我知道 Axum 是建立在 Tokio 之上的,并且 Tokio 有一个多线程调度程序和当前线程调度程序。

是否可以设置运行时以使其在单线程中服务请求?

rust rust-tokio rust-axum

4
推荐指数
1
解决办法
2220
查看次数

如何阅读基于Tokio的Hyper请求的整个主体?

我想使用Hyper的当前主分支编写服务器,该分支保存由POST请求传递的消息,并将此消息发送到每个传入的GET请求.

我有这个,大多是从Hyper示例目录中复制的:

extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;

use futures::future::FutureResult;

use hyper::{Get, Post, StatusCode};
use hyper::header::{ContentLength};
use hyper::server::{Http, Service, Request, Response};
use futures::Stream;

struct Echo {
    data: Vec<u8>,
}

impl Echo {
    fn new() -> Self {
        Echo {
            data: "text".into(),
        }
    }
}

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = FutureResult<Response, hyper::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let resp = match …
Run Code Online (Sandbox Code Playgroud)

rust hyper rust-tokio

3
推荐指数
2
解决办法
2740
查看次数

标签 统计

rust ×10

rust-tokio ×10

hyper ×2

async-await ×1

json ×1

postgresql ×1

rust-axum ×1