标签: rust-tokio

如何在 rust/tokio 中的 TCP 客户端上绑定()?

我需要确保 TCP 连接的客户端通过特定的 (IP) 接口。标准方法是将bind()套接字连接到IP:0, 之前connect()

我开始查看tokio::net::TcpStream::connect()和朋友,似乎没有办法做到这一点。我退后一步看了看std::net::TcpStream,里面也没有。

我是否遗漏了什么,或者我需要使用一些较低级别的 API?

rust rust-tokio

5
推荐指数
1
解决办法
2689
查看次数

为什么 tokio::spawn 即使使用 .clone() 也会抱怨生命周期?

我试图编译以下看似简单的代码,但出现错误:

use std::io::Error;

#[derive(Debug)]
struct NetworkConfig {
    bind: String,
    node_key_file: String,
}

async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
    Ok(())
}

async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
    let network_config_copy = network_config.clone();
    tokio::spawn(async move {
        network_handler(&network_config_copy).await
    }).await?
}
Run Code Online (Sandbox Code Playgroud)
use std::io::Error;

#[derive(Debug)]
struct NetworkConfig {
    bind: String,
    node_key_file: String,
}

async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
    Ok(())
}

async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
    let network_config_copy = network_config.clone();
    tokio::spawn(async move {
        network_handler(&network_config_copy).await
    }).await?
}
Run Code Online (Sandbox Code Playgroud)

从之前关于该主题的讨论和示例中,我了解到传递对 …

lifetime rust async-await rust-tokio

5
推荐指数
1
解决办法
2925
查看次数

从函数返回未来值

我最近开始学习 Rust,我不确定如何从应该返回 Result 的函数返回未来值。当我尝试仅返回响应变量并删除结果输出时,出现错误:无法在返回的函数中使用运算符?std::string::String

#[tokio::main]
async fn download() -> Result<(),reqwest::Error> {
    let url = "https://query1.finance.yahoo.com/v8/finance/chart/TSLA";
    let response = reqwest::get(url)
                            .await?
                            .text()
                            .await?;
    Ok(response)
 }
Run Code Online (Sandbox Code Playgroud)

我在 main() 中期望的是获取并打印响应值:

fn main() {
    let response = download();
    println!("{:?}", response)
}
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
1
解决办法
8484
查看次数

为什么我的 tokio 任务在完成之前就退出了?

下面的程序应该从多个线程定期打印,但它没有按我的预期工作:

# Cargo.toml
[dependencies]
tokio = { version = "0.3", features = ["full"] }
Run Code Online (Sandbox Code Playgroud)
use tokio::prelude::*; //0.3.4
use tokio::runtime::Builder;
use tokio::time::Duration;

fn main() {
    let rt = Builder::new_multi_thread()
        .enable_all()
        .thread_stack_size(3 * 1024 * 1024)
        .build()
        .unwrap();

    rt.block_on(async {
        tokio::spawn(print_thread(1));
        tokio::spawn(print_thread(2));
        tokio::spawn(print_thread(3));
        tokio::spawn(print_thread(4));
    });
}

async fn print_thread(thread_num: usize) {
    tokio::spawn(async move {
        println!("thread{}-start", thread_num);
        loop {
            tokio::time::sleep(Duration::from_millis(1000)).await;
            println!("thread{}-running", thread_num);
        }
        println!("thread{}-start", thread_num);
    });
}
Run Code Online (Sandbox Code Playgroud)

运行这个时,我得到:

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.64s
     Running `target/debug/time_test`
thread1-start
thread2-start …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio

5
推荐指数
1
解决办法
2268
查看次数

Rust 中根据编译目标操作系统将不同类型的值分配给变量的惯用方法是什么?

我正在开发一个绑定到 Tokio 套接字并管理 TCP 连接的代码库。在生产中,它绑定到AF_VSOCK使用tokio-vsock板条箱。

\n

在 Mac 上本地开发时,AF_VSOCKAPI 不可用,因为没有hypervisor -> VM连接 \xe2\x80\x94\xc2\xa0it\ 只是使用cargo run.

\n

在本地运行时,我一直在创建一个标准tokio::net::TcpListener结构,在生产中我一直在创建一个tokio_vsock::VsockListener. 这两种结构大多可以互换并公开相同的方法。无论使用哪个结构,其余代码都可以完美运行。

\n

到目前为止,我只是保留了这两个结构,并简单地注释掉了本地不需要的结构 \xe2\x80\x94 这显然不是“好的做法”。我的代码如下:

\n
#[tokio::main]\nasync fn main() -> Result<(), ()> {\n    // Production AF_VSOCK listener (comment out locally)\n    let mut listener = tokio_vsock::VsockListener::bind(\n        &SockAddr::Vsock(\n          VsockAddr::new(\n            VMADDR_CID_ANY,\n            LISTEN_PORT,\n          )\n        )\n    )\n    .expect("Unable to bind AF_VSOCK listener");\n\n    // Local TCP listener (comment out in production)\n    let mut …
Run Code Online (Sandbox Code Playgroud)

compiler-errors compilation rust rust-cargo rust-tokio

5
推荐指数
1
解决办法
361
查看次数

tokio 是多线程的吗?

我知道 tokio 允许编写并发代码。但我不确定它是否并行运行。我的电脑有八个核心。所以理想情况下我运行的线程不超过八个。如果我需要更多并发性,我会在这些线程之上运行协程(使用 tokio)。

当然,除非 tokio 已经是多线程的了。在这种情况下,一开始就创建这八个线程将会适得其反。所以我想问的是,tokio 默认情况下是多线程的,还是我应该自己实现?

concurrency rust async-await rust-tokio

5
推荐指数
1
解决办法
6535
查看次数

我如何在 Rust futures reqwest 中接受无效或自签名的 SSL 证书?

我的代码如下所示:

let fetches = futures::stream::iter(
    hosts.into_iter().map(|url| {
        async move {
                match reqwest::get(&url).await {
                    // Ok and Err statements here!
                }
Run Code Online (Sandbox Code Playgroud)

但是,这里的问题是,对于具有无效或自签名 SSL 证书的 URL,它会给出错误。所以,我尝试执行以下操作:

let fetches = futures::stream::iter(
    hosts.into_iter().map(|url| {
        async move {
            match reqwest::Client::builder().danger_accept_invalid_certs(true).build().unwrap().get(&url).await {
                // Ok and Err statements here!
            }
Run Code Online (Sandbox Code Playgroud)

当我尝试使用 Cargo 构建它时,它显示“错误[E0277]:`RequestBuilder`不是未来”。

那么,如何让我的代码接受无效证书呢?

rust rust-cargo rust-tokio reqwest

5
推荐指数
1
解决办法
6047
查看次数

如何使用异步 indicatif 显示总计数器栏?

我尝试使用指示板条箱来显示子任务的多个进度条以及一个计算所有已完成任务的进度条。这是我的代码:

# Cargo.toml
[dependencies]
indicatif = "0.15.0"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
Run Code Online (Sandbox Code Playgroud)
//! main.rs
use std::time::Duration;
use futures::{StreamExt, stream::futures_unordered::FuturesUnordered};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let m = MultiProgress::new();
    let sty = ProgressStyle::default_bar()
        .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}")
        .progress_chars("##-");
    let total_pb = m.add(ProgressBar::new(3));
    total_pb.set_style(sty.clone());
    let mut futs = FuturesUnordered::new();
    let durations = [15u64, 8, 3];
    let mut pb_cnt = 0usize;
    for &duration in durations.iter() …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio

5
推荐指数
1
解决办法
2167
查看次数

如何在另一个任务中生成长时间运行的 Tokio 任务而不阻塞父任务?

我正在尝试构建一个对象,该对象可以管理来自 websocket 的提要,但能够在多个提要之间切换。

有一个Feed特点:

trait Feed {
    async fn start(&mut self);
    async fn stop(&mut self);
}
Run Code Online (Sandbox Code Playgroud)

共有三个结构体实现FeedABC

start被调用时,它会启动一个无限循环,监听来自 websocket 的消息并处理每条传入的消息。

我想实现一个FeedManager维护单个活动源但可以接收命令来切换它正在使用的源的命令。

enum FeedCommand {
    Start(String),
    Stop,
}

struct FeedManager {
    active_feed_handle: tokio::task::JoinHandle,
    controller: mpsc::Receiver<FeedCommand>,
}

impl FeedManager {
    async fn start(&self) {
        while let Some(command) = self.controller.recv().await {
            match command {
                FeedCommand::Start(feed_type) => {
                    // somehow tell the active feed to stop (need channel probably) or …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
2
解决办法
6553
查看次数

在编译时等待许多未知的 future

我想利用 Tokio 的运行时来处理可变数量的异步 future。由于 futures 的数量在编译时是未知的,似乎FuturesUnordered是我最好的选择(宏,例如select!需要在编译时指定您的分支;join_all可能是可能的,但文档建议“在很多情况下”当 order 不可用时使用 FuturesUnordered )没关系)。

这段代码的逻辑是一个recv()循环被推送到futures桶中,它应该始终运行。当新数据到达时,它的解析/处理也被推送到 futures 桶(而不是立即处理)。这确保接收器在响应新事件时保持低延迟,并且数据处理(可能需要大量计算的解密)与所有其他数据处理异步块(加上侦听接收器)同时发生。

.boxed()顺便说一句,这个帖子解释了为什么期货会变得。

问题是这个神秘的错误:

错误[E0277] :`dyn futures::Future<Output = ()> + std::marker::Send` 无法在线程之间安全共享
  --> src/main.rs:27:8
    | 
27  | 27     }).boxed());
   |        ^^^^^  `dyn futures::Future<Output = ()> + std::marker::Send` 无法在线程之间安全共享
   | 
   = help : `dyn futures::Future<Output = ()> + std::marker::Send` 未实现 `Sync` 特性
   =注意:需要,因为 `Sync` 的 impl 要求Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
    = note:必需的,因为它出现在类型 `Box<dyn futures::Future<Output = ()> …

rust async-await rust-tokio

5
推荐指数
1
解决办法
2861
查看次数