标签: rust-tokio

如何在 rust/tokio 中的 TCP 客户端上绑定()?

我需要确保 TCP 连接的客户端通过特定的 (IP) 接口。标准方法是将bind()套接字连接到IP:0, 之前connect()

我开始查看tokio::net::TcpStream::connect()和朋友,似乎没有办法做到这一点。我退后一步看了看std::net::TcpStream,里面也没有。

我是否遗漏了什么,或者我需要使用一些较低级别的 API?

rust rust-tokio

5
推荐指数
1
解决办法
2689
查看次数

为什么 tokio::spawn 即使使用 .clone() 也会抱怨生命周期?

我试图编译以下看似简单的代码,但出现错误:

use std::io::Error;

#[derive(Debug)]
struct NetworkConfig {
    bind: String,
    node_key_file: String,
}

async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
    Ok(())
}

async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
    let network_config_copy = network_config.clone();
    tokio::spawn(async move {
        network_handler(&network_config_copy).await
    }).await?
}
Run Code Online (Sandbox Code Playgroud)
use std::io::Error;

#[derive(Debug)]
struct NetworkConfig {
    bind: String,
    node_key_file: String,
}

async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
    Ok(())
}

async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
    let network_config_copy = network_config.clone();
    tokio::spawn(async move {
        network_handler(&network_config_copy).await
    }).await?
}
Run Code Online (Sandbox Code Playgroud)

从之前关于该主题的讨论和示例中,我了解到传递对 …

lifetime rust async-await rust-tokio

5
推荐指数
1
解决办法
2925
查看次数

从函数返回未来值

我最近开始学习 Rust,我不确定如何从应该返回 Result 的函数返回未来值。当我尝试仅返回响应变量并删除结果输出时,出现错误:无法在返回的函数中使用运算符?std::string::String

#[tokio::main]
async fn download() -> Result<(),reqwest::Error> {
    let url = "https://query1.finance.yahoo.com/v8/finance/chart/TSLA";
    let response = reqwest::get(url)
                            .await?
                            .text()
                            .await?;
    Ok(response)
 }
Run Code Online (Sandbox Code Playgroud)

我在 main() 中期望的是获取并打印响应值:

fn main() {
    let response = download();
    println!("{:?}", response)
}
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
1
解决办法
8484
查看次数

tokio::time::timeout 之后再次等待 future

背景:
我有一个进程,用于tokio::process在 tokio 运行时生成带有句柄的子进程。

它还负责在杀死子进程后释放资源,并且根据文档(s​​td::process::Childtokio::process::Child),这需要父进程wait()(或await在 tokio 中)进行该进程。

SIGINT并非所有进程对 a或 a 的行为都相同SIGTERM,因此我想在发送 a 之前给孩子一些时间来死掉SIGKILL

所需的解决方案:

    pub async fn kill(self) {
        // Close input
        std::mem::drop(self.stdin);

        // Send gracefull signal
        let pid = nix::unistd::Pid::from_raw(self.process.id() as nix::libc::pid_t);
        nix::sys::signal::kill(pid, nix::sys::signal::SIGINT);

        // Give the process time to die gracefully
        if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), self.process).await
        {
            // Kill forcefully
            nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL);
            self.process.await;
        }
    }
Run Code Online (Sandbox Code Playgroud)

但是给出了这个错误:

error[E0382]: use of moved value: `self.process` …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio

5
推荐指数
1
解决办法
7135
查看次数

包装 AsyncRead

我似乎无法让编译器让我包装 Tokio AsyncRead:

use std::io::Result;
use core::pin::Pin;
use core::task::{Context, Poll};

use tokio::io::AsyncRead;

struct Wrapper<T: AsyncRead>{
    inner: T
}

impl<T: AsyncRead> AsyncRead for Wrapper<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8]
    ) -> Poll<Result<usize>> {
        self.inner.poll_read(cx, buf)
    }
}
Run Code Online (Sandbox Code Playgroud)

这似乎应该编译,但编译器抱怨我没有包含正确的特征绑定,即使poll_read可以通过 AsyncRead: Playground Link获得

error[E0599]: no method named `poll_read` found for type parameter `T` in the current scope
  --> src/lib.rs:17:20
   |
17 |         self.inner.poll_read(cx, buf)
   |                    ^^^^^^^^^ method not found in `T`
   |
   = help: …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
1
解决办法
1204
查看次数

当使用 tokio::spawn 和带有可变引用的 future 时,“借用的值存在的时间不够长”

以下代码无法编译,因为编译器无法保证hashmap_of_lists其寿命足够长。我无法克服这一点。

我尝试过使用ArcandMutex但由于内部some_func使用的异步方式,我遇到了其他问题Mutex

use futures; // 0.3.5
use std::collections::HashMap;
use tokio; // 0.2.21

async fn some_func(_some_slice: &mut [String]) {}

#[tokio::main]
async fn main() {
    let mut hashmap_of_lists = HashMap::<String, Vec<String>>::new();
    let mut join_handles = Vec::new();

    for (_, value) in hashmap_of_lists.iter_mut() {
        let future = some_func(value);
        let join_handle = tokio::task::spawn(future);
        join_handles.push(join_handle);
    }

    futures::future::join_all(join_handles).await;
}
Run Code Online (Sandbox Code Playgroud)

我收到这个错误

error[E0597]: `hashmap_of_lists` does not live long enough
  --> src/main.rs:12:23
   |
12 |     for (_, value) in hashmap_of_lists.iter_mut() …
Run Code Online (Sandbox Code Playgroud)

asynchronous reference rust rust-tokio borrow

5
推荐指数
0
解决办法
2070
查看次数

如何查找 tokio::sync::mpsc::Receiver 是否已关闭?

我有一个循环,我在其中做一些工作并使用 发送结果Sender。这项工作需要时间,如果失败我需要重试。当我重试时,接收器可能已关闭,我的重试将是浪费时间。因此,我需要一种方法来检查是否Receiver可用而不发送消息。

在理想的世界中,我希望我的代码在伪代码中看起来像这样:

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
   // do som stuff with rx and drop it after some time
    rx.recv(...).await;
});

let mut attempts = 0;
loop {
    if tx.is_closed() {
       break;
    }
    if let Ok(result) = do_work().await {
        attempts = 0;
        let _ = tx.send(result).await;
    } else {
        if attempts >= 10 {
            break;
        } else {
            attempts += 1;
            continue;
        }
    }
};
Run Code Online (Sandbox Code Playgroud)

问题是Sender没有is_closed方法。它确实有pub fn …

future rust rust-tokio

5
推荐指数
1
解决办法
3263
查看次数

tokio 任务调度程序是“窃取工作”还是先从全局队列中获取?

tokio中,当处理器完成运行队列中的所有任务时,它们是首先在全局队列中查找更多任务,还是首先尝试从同级处理器中窃取工作?

rust rust-tokio

5
推荐指数
1
解决办法
1135
查看次数

tokio::time::sleep 方法是否将任务从运行队列中取出?

我一直在寻找 tokio 源代码来获取问题的答案,我的印象是sleep 方法实际上放置了一个带有持续时间的计时器,但我认为我可能误解了代码,因为这样做效率非常低。是否可以更清楚地了解这一点?

rust rust-tokio

5
推荐指数
1
解决办法
1879
查看次数

为什么我的 tokio 任务在完成之前就退出了?

下面的程序应该从多个线程定期打印,但它没有按我的预期工作:

# Cargo.toml
[dependencies]
tokio = { version = "0.3", features = ["full"] }
Run Code Online (Sandbox Code Playgroud)
use tokio::prelude::*; //0.3.4
use tokio::runtime::Builder;
use tokio::time::Duration;

fn main() {
    let rt = Builder::new_multi_thread()
        .enable_all()
        .thread_stack_size(3 * 1024 * 1024)
        .build()
        .unwrap();

    rt.block_on(async {
        tokio::spawn(print_thread(1));
        tokio::spawn(print_thread(2));
        tokio::spawn(print_thread(3));
        tokio::spawn(print_thread(4));
    });
}

async fn print_thread(thread_num: usize) {
    tokio::spawn(async move {
        println!("thread{}-start", thread_num);
        loop {
            tokio::time::sleep(Duration::from_millis(1000)).await;
            println!("thread{}-running", thread_num);
        }
        println!("thread{}-start", thread_num);
    });
}
Run Code Online (Sandbox Code Playgroud)

运行这个时,我得到:

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.64s
     Running `target/debug/time_test`
thread1-start
thread2-start …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio

5
推荐指数
1
解决办法
2268
查看次数