标签: rust-tokio

Rust 中根据编译目标操作系统将不同类型的值分配给变量的惯用方法是什么?

我正在开发一个绑定到 Tokio 套接字并管理 TCP 连接的代码库。在生产中,它绑定到AF_VSOCK使用tokio-vsock板条箱。

\n

在 Mac 上本地开发时,AF_VSOCKAPI 不可用,因为没有hypervisor -> VM连接 \xe2\x80\x94\xc2\xa0it\ 只是使用cargo run.

\n

在本地运行时,我一直在创建一个标准tokio::net::TcpListener结构,在生产中我一直在创建一个tokio_vsock::VsockListener. 这两种结构大多可以互换并公开相同的方法。无论使用哪个结构,其余代码都可以完美运行。

\n

到目前为止,我只是保留了这两个结构,并简单地注释掉了本地不需要的结构 \xe2\x80\x94 这显然不是“好的做法”。我的代码如下:

\n
#[tokio::main]\nasync fn main() -> Result<(), ()> {\n    // Production AF_VSOCK listener (comment out locally)\n    let mut listener = tokio_vsock::VsockListener::bind(\n        &SockAddr::Vsock(\n          VsockAddr::new(\n            VMADDR_CID_ANY,\n            LISTEN_PORT,\n          )\n        )\n    )\n    .expect("Unable to bind AF_VSOCK listener");\n\n    // Local TCP listener (comment out in production)\n    let mut …
Run Code Online (Sandbox Code Playgroud)

compiler-errors compilation rust rust-cargo rust-tokio

5
推荐指数
1
解决办法
361
查看次数

tokio 是多线程的吗?

我知道 tokio 允许编写并发代码。但我不确定它是否并行运行。我的电脑有八个核心。所以理想情况下我运行的线程不超过八个。如果我需要更多并发性,我会在这些线程之上运行协程(使用 tokio)。

当然,除非 tokio 已经是多线程的了。在这种情况下,一开始就创建这八个线程将会适得其反。所以我想问的是,tokio 默认情况下是多线程的,还是我应该自己实现?

concurrency rust async-await rust-tokio

5
推荐指数
1
解决办法
6535
查看次数

我如何在 Rust futures reqwest 中接受无效或自签名的 SSL 证书?

我的代码如下所示:

let fetches = futures::stream::iter(
    hosts.into_iter().map(|url| {
        async move {
                match reqwest::get(&url).await {
                    // Ok and Err statements here!
                }
Run Code Online (Sandbox Code Playgroud)

但是,这里的问题是,对于具有无效或自签名 SSL 证书的 URL,它会给出错误。所以,我尝试执行以下操作:

let fetches = futures::stream::iter(
    hosts.into_iter().map(|url| {
        async move {
            match reqwest::Client::builder().danger_accept_invalid_certs(true).build().unwrap().get(&url).await {
                // Ok and Err statements here!
            }
Run Code Online (Sandbox Code Playgroud)

当我尝试使用 Cargo 构建它时,它显示“错误[E0277]:`RequestBuilder`不是未来”。

那么,如何让我的代码接受无效证书呢?

rust rust-cargo rust-tokio reqwest

5
推荐指数
1
解决办法
6047
查看次数

如何使用异步 indicatif 显示总计数器栏?

我尝试使用指示板条箱来显示子任务的多个进度条以及一个计算所有已完成任务的进度条。这是我的代码:

# Cargo.toml
[dependencies]
indicatif = "0.15.0"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
Run Code Online (Sandbox Code Playgroud)
//! main.rs
use std::time::Duration;
use futures::{StreamExt, stream::futures_unordered::FuturesUnordered};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let m = MultiProgress::new();
    let sty = ProgressStyle::default_bar()
        .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}")
        .progress_chars("##-");
    let total_pb = m.add(ProgressBar::new(3));
    total_pb.set_style(sty.clone());
    let mut futs = FuturesUnordered::new();
    let durations = [15u64, 8, 3];
    let mut pb_cnt = 0usize;
    for &duration in durations.iter() …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio

5
推荐指数
1
解决办法
2167
查看次数

如何在另一个任务中生成长时间运行的 Tokio 任务而不阻塞父任务?

我正在尝试构建一个对象,该对象可以管理来自 websocket 的提要,但能够在多个提要之间切换。

有一个Feed特点:

trait Feed {
    async fn start(&mut self);
    async fn stop(&mut self);
}
Run Code Online (Sandbox Code Playgroud)

共有三个结构体实现FeedABC

start被调用时,它会启动一个无限循环,监听来自 websocket 的消息并处理每条传入的消息。

我想实现一个FeedManager维护单个活动源但可以接收命令来切换它正在使用的源的命令。

enum FeedCommand {
    Start(String),
    Stop,
}

struct FeedManager {
    active_feed_handle: tokio::task::JoinHandle,
    controller: mpsc::Receiver<FeedCommand>,
}

impl FeedManager {
    async fn start(&self) {
        while let Some(command) = self.controller.recv().await {
            match command {
                FeedCommand::Start(feed_type) => {
                    // somehow tell the active feed to stop (need channel probably) or …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
2
解决办法
6553
查看次数

在编译时等待许多未知的 future

我想利用 Tokio 的运行时来处理可变数量的异步 future。由于 futures 的数量在编译时是未知的,似乎FuturesUnordered是我最好的选择(宏,例如select!需要在编译时指定您的分支;join_all可能是可能的,但文档建议“在很多情况下”当 order 不可用时使用 FuturesUnordered )没关系)。

这段代码的逻辑是一个recv()循环被推送到futures桶中,它应该始终运行。当新数据到达时,它的解析/处理也被推送到 futures 桶(而不是立即处理)。这确保接收器在响应新事件时保持低延迟,并且数据处理(可能需要大量计算的解密)与所有其他数据处理异步块(加上侦听接收器)同时发生。

.boxed()顺便说一句,这个帖子解释了为什么期货会变得。

问题是这个神秘的错误:

错误[E0277] :`dyn futures::Future<Output = ()> + std::marker::Send` 无法在线程之间安全共享
  --> src/main.rs:27:8
    | 
27  | 27     }).boxed());
   |        ^^^^^  `dyn futures::Future<Output = ()> + std::marker::Send` 无法在线程之间安全共享
   | 
   = help : `dyn futures::Future<Output = ()> + std::marker::Send` 未实现 `Sync` 特性
   =注意:需要,因为 `Sync` 的 impl 要求Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
    = note:必需的,因为它出现在类型 `Box<dyn futures::Future<Output = ()> …

rust async-await rust-tokio

5
推荐指数
1
解决办法
2861
查看次数

rust tokio 特征边界在前向方法上不满足

我升级了wraptokio在我的 Rust 项目中,升级后,前向方法出现错误。我查了一下文档,新版本的tokio框架中并没有forward方法。

错误

error[E0599]: the method `forward` exists for struct `tokio::sync::mpsc::UnboundedReceiver<_>`, but its trait bounds were not satisfied


tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| {
                                      ^^^^^^^ method cannot be called on `tokio::sync::mpsc::UnboundedReceiver<_>` due to unsatisfied trait bounds
40 | pub struct UnboundedReceiver<T> {
   | -------------------------------
   | |
   | doesn't satisfy `_: warp::Stream`
   | doesn't satisfy `tokio::sync::mpsc::UnboundedReceiver<_>: StreamExt`
   |
   = note: the following trait bounds were not satisfied:
           `tokio::sync::mpsc::UnboundedReceiver<_>: warp::Stream`
           which is required by `tokio::sync::mpsc::UnboundedReceiver<_>: StreamExt`
           `&tokio::sync::mpsc::UnboundedReceiver<_>: warp::Stream`
           which is required …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
1
解决办法
3005
查看次数

AsyncReadExt::read_u64 取消安全吗?

在它的文档中AsyncReadExt::read_u64说它具有与 相同的错误AsyncReadExt::read_exact,但没有提及取消安全性。read_<type>这同样适用于上的所有其他函数AsyncReadExt。它们似乎具有与read_exact(即没有)相同的取消安全性,但这是真的吗?

是否有另一种方法以取消安全的方式读取接下来的 4 个字节?

Tokio 中有一些东西可以在更高级别上涵盖我的用例,但我想知道我自己将如何做到这一点。

rust rust-tokio

5
推荐指数
1
解决办法
149
查看次数

block_in_place和spawn_blocking如何选择?

我与 tokio 合作很多,并且一直在使用spawn_blocking来处理将阻塞线程的代码。然后我看到了block_in_place的文档,它似乎是前者的无限制(发送,'静态)版本。

我的问题是,如果我已经在线程运行时,什么时候不建议使用 block_in_place ?每种驱动同步码的方法有什么区别和优点?如果我多次阻塞(例如同时在所有线程中),这会是一个问题吗?它是如何工作的?

我阅读了所有的 tokio 文档,但没有找到这些问题的答案,所以感觉在这里问是正确的。

rust rust-tokio

5
推荐指数
1
解决办法
2263
查看次数

如何优雅地关闭 warp 服务器?

我正在用 Rust 编写一个带有扭曲的服务。当服务收到 SIGTERM 信号时,我希望它正常关闭,并可能执行一些日志记录或其他工作。

我尝试了很多例子,但没有任何效果。最有希望的似乎来自这个问题,但我似乎无法让它工作,甚至无法编译。我怀疑自从回答这个问题以来事情已经发生了变化。

# Cargo.toml
[dependencies]
tokio = {version = "1", features = ["full"]}
warp = "0.3"
futures = "0.3"
Run Code Online (Sandbox Code Playgroud)
//! main.rs
use warp::Filter;
use futures;

fn main() {
    let (tx, rx) = tokio::sync::oneshot::channel();
    tokio::run(futures::future::lazy(move || {
        let routes = warp::any().map(|| "Hello, World!");
        let (_, server) = warp::serve(routes)
            .bind_with_graceful_shutdown(([127, 0, 0, 1], 3030), rx);
        warp::spawn(server);
    }));

    println!("Exiting!");
}
Run Code Online (Sandbox Code Playgroud)
//! main.rs
use warp::Filter;
use futures;

fn main() {
    let (tx, rx) = tokio::sync::oneshot::channel();
    tokio::run(futures::future::lazy(move || { …
Run Code Online (Sandbox Code Playgroud)

signals shutdown rust warp rust-tokio

5
推荐指数
1
解决办法
2302
查看次数