标签: rust-tokio

多线程 tokio 是否在单个操作系统线程上运行任务?

当我在 tokio 多线程运行时生成一个任务时,是否保证它会选择一个运行时操作系统线程并仅在其上运行?或者在轮询 future 时它可能会在不同的操作系统线程之间跳转?

更新:如果一项任务可以在操作系统线程之间切换,是否可以将其配置为仅使用一个线程?

rust rust-tokio

1
推荐指数
1
解决办法
1407
查看次数

mpsc channels stucking at the receiver

use std::sync::Arc;

use tokio::sync::mpsc;

async fn student(id : i32,tx : Arc<mpsc::Sender<String>>) {
    println!("student {} is getting their hw.",id);
    tx.send(format!("student {}'s hw !",id)).await.unwrap();
}

async fn teacher(mut rc : mpsc::Receiver<String>) -> Vec<String> {
    let mut homeworks = Vec::new();
    while let Some(hw) = rc.recv().await {
        println!("{hw}");
        homeworks.push(hw);
    }
    homeworks
}

#[tokio::main]
async fn main() {
    let (tx,rc): (mpsc::Sender<String>, mpsc::Receiver<String>) = mpsc::channel(100);
    let ch_arc: Arc<mpsc::Sender<String>> = Arc::new(tx);
    
    for i in 0..10 {
        tokio::task::spawn(student(i,ch_arc.clone()));
    }
    let hws = teacher(rc).await;
    println!("{:?}",hws);
}
Run Code Online (Sandbox Code Playgroud)

I am currently …

channel rust rust-tokio mpsc

1
推荐指数
1
解决办法
61
查看次数

在将消息从期货渠道转发到WebSocket接收器时,类型不匹配解决错误类型

我试图总结我的周围锈期货的头,但我这个代码是应该发送到达的消息弄得rxsink:

extern crate futures;
extern crate tokio_core;
extern crate websocket;

use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;

use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{thread, time};
use futures::sync::mpsc::Receiver;

fn main() {
    let mut core = Core::new().unwrap();
    let (mut tx, rx) = mpsc::channel(5);
    thread::spawn(|| worker(rx));
    let mut i = 0;
    loop {
        let res = tx.clone().send(OwnedMessage::Text(format!("Test {}", i)));
        core.run(res);
        i += 1;
        let period = time::Duration::from_millis(200);
        thread::sleep(period);
    }
}

fn worker(rx: Receiver<OwnedMessage>) {
    let mut core …
Run Code Online (Sandbox Code Playgroud)

future rust rust-tokio

0
推荐指数
1
解决办法
1066
查看次数

预期的XYZ已找到()

例如:

use futures::future::Future;

fn main() {
    let (stop_tokio, time_to_stop) = tokio::sync::oneshot::channel::<()>();
    let handler = std::thread::spawn(|| {
        tokio::run(
            time_to_stop, //           .map_err(|_| ())
        );
    });
    handler.join().expect("join failed");
}
Run Code Online (Sandbox Code Playgroud)

编译器显示错误:

use futures::future::Future;

fn main() {
    let (stop_tokio, time_to_stop) = tokio::sync::oneshot::channel::<()>();
    let handler = std::thread::spawn(|| {
        tokio::run(
            time_to_stop, //           .map_err(|_| ())
        );
    });
    handler.join().expect("join failed");
}
Run Code Online (Sandbox Code Playgroud)

该代码要求使用()RecvError但改为,但是编译器打印相反的代码。

这是编译器中的错误,还是我错过了什么?

type-inference compiler-errors rust rust-tokio

0
推荐指数
1
解决办法
70
查看次数

是否有一种 FuturesOrdered 替代方案可以一一产生结果?

在 Rust 中,我有一堆想要并行执行的异步函数。处理这些函数结果的顺序很重要。我还想在这些函数可用时检索它们的结果。

\n

让我不好解释一下。

\n

以下是 的描述FuturesOrdered

\n
\n

这个“组合器”类似于 FuturesUnordered,但它在 future 集合之上强加了一个顺序。虽然集合中的 future 将并行完成\n,但结果将仅按其原始 future 添加到队列的顺序返回。

\n
\n

到目前为止,一切都很好。现在看这个例子:

\n
let mut ft = FuturesOrdered::new();\nft.push(wait_n(1)); // wait_n sleeps\nft.push(wait_n(2)); // for the given\nft.push(wait_n(4)); // number of secs\nft.push(wait_n(3));\nft.push(wait_n(5));\nlet r = ft.collect::<Vec<u64>>().await;\n
Run Code Online (Sandbox Code Playgroud)\n

由于FuturesOrdered\xc2\xa0await 直到所有future 都完成;这就是我得到的:

\n
|--|        ++\n|----|      ++\n|--------|  ++\n|------|    ++\n|----------|++\n            ++-> all results available here \n
Run Code Online (Sandbox Code Playgroud)\n

这就是我要的:

\n
|--|++\n|----|++\n|--------|++\n|------|    ++\n|----------|  ++\n               \n
Run Code Online (Sandbox Code Playgroud)\n

换句话说; 我要等待下一个未来;随着剩余的期货不断地完成。另请注意,即使任务#4 在任务#3 之前完成;由于最初的订单,它是在#3 之后处理的。

\n

我怎样才能获得像这样同时执行的期货流?我希望有这样的事情: …

future stream rust async-await rust-tokio

0
推荐指数
1
解决办法
1557
查看次数

错误:无法为 `syn` 选择版本

error: failed to select a version for `syn`. \
    ... required by package `serde_derive v1.0.125`\
    ... which satisfies dependency `serde_derive = "=1.0.125"` of package `serde v1.0.125`    
    ... which satisfies dependency `serde = "^1.0.125"` of package `mongodb v2.1.0`\
    ... which satisfies dependency `mongodb = "^2.1"` of package `wagmeet v0.1.0 
 \(/mnt/e/College/Eighth Semester/Crypto_Capable/wagmeet_app)`\
versions that meet the requirements `^1.0.60` are: 1.0.86, 1.0.85, 1.0.84, 1.0.83, 1.0.82, 1.0.81, 1.0.80, 1.0.79, 1.0.78, 1.0.77, 1.0.76, 1.0.75, 1.0.74, 1.0.73, 1.0.72, 1.0.71, 1.0.70, 1.0.69, 1.0.68, 1.0.67, 1.0.66, 1.0.65, 1.0.64, …
Run Code Online (Sandbox Code Playgroud)

rust rust-cargo rust-diesel rust-tokio nearprotocol

0
推荐指数
1
解决办法
4156
查看次数

Rust 将加密和压缩操作改为异步的目的是什么?

在我的理解中,异步只能处理I/O密集型任务,例如读写套接字或文件,而无法处理CPU密集型任务,例如加密和压缩。

所以在 Rust Tokio Runtime 中,我认为只需要用来spawn_blocking处理 CPU 密集型任务。但我看过这个回购协议,例子是

#[tokio_02::main]
async fn main() -> Result<()> {
    let data = b"example";
    let compressed_data = compress(data).await?;
    let de_compressed_data = decompress(&compressed_data).await?;
    assert_eq!(de_compressed_data, data);
    println!("{:?}", String::from_utf8(de_compressed_data).unwrap());
    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

该库在压缩和异步 I/O 类型之间创建适配器。

我有 3 个问题:

  1. 等待压缩/解压缩的目的是什么?

  2. 这些适配器是必要的还是我对异步的理解错误?

  3. 我可以直接在 Tokio 多线程运行时进行压缩操作吗?像这样

async fn foo() {
    let mut buf = ...;
    compress_sync(&mut buf);
    async_tcp_stream.write(buf).await;
}
Run Code Online (Sandbox Code Playgroud)

compression encryption asynchronous rust rust-tokio

0
推荐指数
1
解决办法
215
查看次数

Rust:如何在流适配器中使用一个值而不克隆它两次

目前,当我的流收到新值时,我尝试打印计数器值:


fn add_counter_to_stream<T: std::marker::Send + 'static>(
    stream: PinnedStream<T>,
) -> PinnedStream<T> {
    let counter = Arc::new(Mutex::new(0)); // Shared counter between threads
    futures::StreamExt::boxed(stream.map({
        let counter = Arc::clone(&counter);
        move |value| {
            let counter = Arc::clone(&counter);
            async move {
                // Print counter value and increment
                let mut num = counter.lock().await;
                println!("Counter: {}", *num);
                *num += 1;
            };
            value
        }
    }))
}
Run Code Online (Sandbox Code Playgroud)

Arc::clone(&counter)然而,如上所示,除非我创建两个嵌套闭包并使用两次,否则代码不会编译。有没有一种方法可以重构它,以便在一个克隆操作中完成,为什么在这个示例中我需要两个闭包?

rust rust-tokio

0
推荐指数
1
解决办法
38
查看次数

如何使 tokio 流同时工作

我想运行一系列流,其中多个流进程同时运行。我尝试的测试代码如下:

use tokio;
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..10)
        .then(|i| async move {
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("i={:?}", i);
        })
        .chunks_timeout(3, Duration::from_secs(20));

    let _result : Vec<_> = stream
        .collect().await;
}
Run Code Online (Sandbox Code Playgroud)

此代码运行,但它会一一打印 10 个值,并有 1 秒的延迟。这与并发相反。我期望的是等待 3 秒,打印 3 个数字,然后等待 1 秒,依此类推。我认为这tokio::time::sleep很好,因为join_all我让代码同时工作。

那么,如何解释缺乏并发性以及如何解决呢?

asynchronous rust rust-tokio

-1
推荐指数
1
解决办法
1194
查看次数