标签: rust-tokio

启用 Tokio 投票未来的最小功能集是什么?

我想轮询一个异步函数:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    some_function().await;
}
Run Code Online (Sandbox Code Playgroud)

我目前正在激活所有功能:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    some_function().await;
}
Run Code Online (Sandbox Code Playgroud)

其中哪些是必要的?

tokio = { version = "1.4.0", features = ["full"] }
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

9
推荐指数
1
解决办法
9099
查看次数

异步文件IO有什么意义吗?

像tokio这样的 Rust 异步运行时提供了许多标准函数的“异步”副本,包括一些文件 IO 函数,它们基本上只是通过调用相应的阻塞任务(在新线程上?)来工作。此类函数的示例有tokio::fs::create_dir_all, tokio::fs::read_dir, tokio::fs::read, ...

\n

所有这些功能有什么优点?为什么我应该更喜欢在异步上下文中使用它们而不是标准阻塞函数?如果我是.await他们的结果,有什么收获吗?

\n

一个示例是异步 Web 路由,它根据查询返回某个文件的内容(使用Rocket):

\n
#[get("/foo/<query>")]\nasync fn foo(query: &str) -> Option<String> {\n    let file_path = // interpret `query` somehow and find the target file\n    tokio::fs::read_to_string(file_path).await.ok()\n    // ^ Why not just `std::fs::read_to_string(file_path).ok()`?\n}\n
Run Code Online (Sandbox Code Playgroud)\n

async/.await我理解套接字 IO 或延迟任务(线程睡眠)的好处,但在这种情况下,这对我来说似乎毫无意义。但相反的 \xe2\x80\x94 这使得在代码中解决更复杂的任务变得更加困难(例如,在目录列表中搜索文件时使用流)。

\n

concurrency asynchronous rust async-await rust-tokio

9
推荐指数
2
解决办法
6373
查看次数

在 Rust 中使用 tokio 代替 OS 线程有什么好处

我正在尝试用 Rust 制作一个多线程 tcp 通信程序

这个想法是主线程上存在一个侦听套接字,并且当连接进入时,工作由工作线程处理

我之前使用了 Rust 书中找到的 ThreadPool 方法,但据我了解,tokio 能够“自动”将工作分配给池中的线程

我对操作系统线程和 tokio 任务之间的区别感到困惑(主要是因为您用来spawn创建两者)

这是一些代码

fn main() {
    println!("Hello World!");
    let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 103, 7)), 2048);
    println!("socket -> {}", socket);

    // You need to use the tokio runtime to execute async functions
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let listener = StartListen::new(socket).await.unwrap();
    });
}
Run Code Online (Sandbox Code Playgroud)

StartListen在另一个文件中定义了

// Defines the StartListen class
pub struct StartListen {
    listener: TcpListener,
}


// Implementation for …
Run Code Online (Sandbox Code Playgroud)

multithreading threadpool rust rust-cargo rust-tokio

9
推荐指数
1
解决办法
4123
查看次数

使用TcpConnectionNew时,不满足特征绑定`():futures :: Future`

我正在尝试使用Tokio crate 在Rust中编写一个简单的TCP客户端.我的代码非常接近这个例子减去TLS:

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::Future;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tokio_io::io;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let connection = TcpStream::connect(&"127.0.0.1:8080".parse().unwrap(), &handle);

    let server = connection.and_then(|stream| {
        io::write_all(stream, b"hello");
    });

    core.run(server).unwrap();
}
Run Code Online (Sandbox Code Playgroud)

但是,编译失败并显示错误:

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/main.rs:16:29
   |
16 |     let server = connection.and_then(|stream| {
   |                             ^^^^^^^^ the trait `futures::Future` is not implemented for `()` …
Run Code Online (Sandbox Code Playgroud)

tcp rust rust-tokio

8
推荐指数
2
解决办法
4082
查看次数

将 AsyncRead 转换为字节的 TryStream 的最佳方法是什么?

我有一个AsyncRead,想Stream<Item = tokio::io::Result<Bytes>>用 tokio 0.2 和 futures 0.3将它转换为一个。

我能做的最好的是:

use bytes::Bytes; // 0.4.12
use futures::stream::{Stream, TryStreamExt};; // 0.3.1
use tokio::{fs::File, io::Result}; // 0.2.4
use tokio_util::{BytesCodec, FramedRead}; // 0.2.0

#[tokio::main]
async fn main() -> Result<()> {
    let file = File::open("some_file.txt").await?;
    let stream = FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
    fn_that_takes_stream(stream)
}

fn fn_that_takes_stream<S, O>(s: S) -> Result<()>
where
    S: Stream<Item = Result<Bytes>>,
{
    //...
    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

似乎应该有一种更简单的方法;我很惊讶 Tokio 没有包含一个编解码器来获取一个流Bytes而不是,BytesMut或者没有一个扩展特性提供一种将 anAsyncRead转换为Stream. 我错过了什么吗?

rust async-await rust-tokio

8
推荐指数
1
解决办法
2340
查看次数

不同线程内两个 tokio 运行时之间的消息传递

我遇到了这个问题中描述的问题:如何在另一个 Tokio 运行时中创建 Tokio 运行时,而不会收到错误“无法从运行时内启动运行时”?

一些好的 Rust crate 没有异步执行器。我决定将所有此类库调用放在一个能够容忍此类操作的线程中。另一个线程应该能够使用 发送非闪烁消息tokio::channel

我编写了一个演示站来测试实施选项。在每个运行时内部进行调用tokio::spawn是为了了解 tokio 运行时和处理程序中的更多细节 - 这是问题的一部分。

问题。 如果我还有什么误解,请纠正我。
有两个 tokio 运行时。每个都在自己的线程中启动。tokio::spawn在第一个运行时内部调用会first_runtime()生成任务。tokio::spawn内部调用会second_runtime()在第二个运行时生成任务。tokio::channel这两个任务之间有一个。如果通道tx.send(...).await缓冲区未满,即使接收线程被调用阻塞,调用也不会阻塞发送线程thread::sleep()
我一切都做对了吗?这段代码的输出告诉我我是对的,但我需要确认我的推理。

use std::thread;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, Receiver, channel}; // 1.12.0


#[tokio::main(worker_threads = 1)]
#[allow(unused_must_use)]
async fn first_runtime(tx: Sender<String>) {
    thread::sleep(Duration::from_secs(1));
    println!("first thread woke up");
    tokio::spawn(async move {
        for msg_id in 0..10 {
            if let Err(e) = tx.send(format!("message {}", msg_id)).await …
Run Code Online (Sandbox Code Playgroud)

multithreading rust rust-tokio

8
推荐指数
0
解决办法
1982
查看次数

将 reqwest bytes_stream 复制到 tokio 文件

我正在尝试将使用 reqwest 下载的文件复制到 tokio 文件中。该文件太大,无法存储在内存中,因此需要通过bytes_stream()而不是bytes()

我尝试过以下操作

let mut tmp_file = tokio::fs::File::from(tempfile::tempfile()?);
let byte_stream = reqwest::get(&link).await?.bytes_stream();
tokio::io::copy(&mut byte_stream, &mut tmp_file).await?;
Run Code Online (Sandbox Code Playgroud)

这失败了,因为

    |
153 |     tokio::io::copy(&mut byte_stream, &mut tmp_file).await?;
    |     --------------- ^^^^^^^^^^^^^^^^ the trait `tokio::io::AsyncRead` is not implemented for `impl Stream<Item = Result<bytes::bytes::Bytes, reqwest::Error>>`
    |     |
    |     required by a bound introduced by this call
Run Code Online (Sandbox Code Playgroud)

有什么方法可以在流上获取特征 AsyncRead 或以其他方式将此数据复制到文件中?我使用 tokio 文件的原因是我稍后需要从中进行 AsyncRead 。也许复制到常规 std::File 然后将其转换为 tokio::fs::File 是有意义的?

rust rust-tokio reqwest

8
推荐指数
1
解决办法
1850
查看次数

什么时候应该使用 Tokio 的 `spawn_blocking`?

task文档中,有一节讨论了在异步中调用阻塞代码,以及如何避免这种情况,以免过多地阻塞异步线程(https://docs.rs/tokio/1.21.2/tokio/task/ index.html#blocking-and-yielding)。

它还讨论了用于tokio::task::spawn_blocking这些任务的方法,但我想知道在什么时候建议将工作发送到不同的线程?我目前正在编写一个程序,可以恢复大量的 ECDSA 签名,每条消息大约需要 100 微秒,同时进行大量的网络 IO。作为一个具体的例子,这足以使用类似的东西吗spawn_blocking

rust async-await rust-tokio

8
推荐指数
1
解决办法
9431
查看次数

使用tokio_timer重复执行Rust任务

我正在使用Tokio框架在Rust中创建重复任务。下面的代码基于完成的更改请求,以将该功能添加到Tokio-Timer板条箱中。

尝试编译时,出现错误消息:

error[E0281]: type mismatch: the type `fn() {my_cron_func}` implements the trait `std::ops::FnMut<()>`, but the trait `std::ops::FnMut<((),)>` is required (expected tuple, found ())
  --> src/main.rs:19:36
   |
19 |     let background_tasks = wakeups.for_each(my_cron_func);
   |                                    ^^^^^^^^

error[E0281]: type mismatch: the type `fn() {my_cron_func}` implements the trait `std::ops::FnOnce<()>`, but the trait `std::ops::FnOnce<((),)>` is required (expected tuple, found ())
  --> src/main.rs:19:36
   |
19 |     let background_tasks = wakeups.for_each(my_cron_func);
   |                                    ^^^^^^^^

error[E0281]: type mismatch: the type `fn() {my_cron_func}` implements the trait `std::ops::FnMut<()>`, but …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

7
推荐指数
1
解决办法
1635
查看次数

`std::sync::mpsc::Sender&lt;i32&gt; 未实现特征 `std::marker::Sync`

我正在尝试使用 MPSC 构建多线程应用程序,但遇到了标题中的错误。我不确定这个用例的正确模式是什么 - 我正在寻找一种模式,它允许我克隆生产者通道并将其移动到要使用的新线程中。

这个新线程将保持一个打开的 websocket,并在收到 websocket 消息时通过生产者发送 websocket 消息数据的子集。消费者线程中将需要来自其他线程的数据,这就是为什么我认为 MPSC 模式是一个合适的选择。

除了标题中的消息之外,它还显示以下内容:

`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`
Run Code Online (Sandbox Code Playgroud)

我可以/应该Send为此实施吗?Rc这是使用or的合适时机吗Pin?我相信这种情况正在发生,因为我试图发送一个未在闭包Send中实现的类型,但我不知道如何利用它或在这种情况下要达到什么目的。.awaitasync

我已经能够将我的问题简化为:

use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};

#[tokio::main]
async fn main() {
    let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();

    tokio::spawn(async move {
        let a = [1, 2, 3];
        let mut s = stream::iter(a.iter())
            .cycle()
            .for_each(move |int| async { …
Run Code Online (Sandbox Code Playgroud)

concurrency rust async-await rust-tokio

7
推荐指数
1
解决办法
3190
查看次数