我需要确保 TCP 连接的客户端通过特定的 (IP) 接口。标准方法是将bind()套接字连接到IP:0, 之前connect()。
我开始查看tokio::net::TcpStream::connect()和朋友,似乎没有办法做到这一点。我退后一步看了看std::net::TcpStream,里面也没有。
我是否遗漏了什么,或者我需要使用一些较低级别的 API?
我试图编译以下看似简单的代码,但出现错误:
use std::io::Error;
#[derive(Debug)]
struct NetworkConfig {
bind: String,
node_key_file: String,
}
async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
Ok(())
}
async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
let network_config_copy = network_config.clone();
tokio::spawn(async move {
network_handler(&network_config_copy).await
}).await?
}
Run Code Online (Sandbox Code Playgroud)
use std::io::Error;
#[derive(Debug)]
struct NetworkConfig {
bind: String,
node_key_file: String,
}
async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
Ok(())
}
async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
let network_config_copy = network_config.clone();
tokio::spawn(async move {
network_handler(&network_config_copy).await
}).await?
}
Run Code Online (Sandbox Code Playgroud)
从之前关于该主题的讨论和示例中,我了解到传递对 …
我最近开始学习 Rust,我不确定如何从应该返回 Result 的函数返回未来值。当我尝试仅返回响应变量并删除结果输出时,出现错误:无法在返回的函数中使用运算符?std::string::String
#[tokio::main]
async fn download() -> Result<(),reqwest::Error> {
let url = "https://query1.finance.yahoo.com/v8/finance/chart/TSLA";
let response = reqwest::get(url)
.await?
.text()
.await?;
Ok(response)
}
Run Code Online (Sandbox Code Playgroud)
我在 main() 中期望的是获取并打印响应值:
fn main() {
let response = download();
println!("{:?}", response)
}
Run Code Online (Sandbox Code Playgroud) 背景:
我有一个进程,用于tokio::process在 tokio 运行时生成带有句柄的子进程。
它还负责在杀死子进程后释放资源,并且根据文档(std::process::Child、tokio::process::Child),这需要父进程wait()(或await在 tokio 中)进行该进程。
SIGINT并非所有进程对 a或 a 的行为都相同SIGTERM,因此我想在发送 a 之前给孩子一些时间来死掉SIGKILL。
所需的解决方案:
pub async fn kill(self) {
// Close input
std::mem::drop(self.stdin);
// Send gracefull signal
let pid = nix::unistd::Pid::from_raw(self.process.id() as nix::libc::pid_t);
nix::sys::signal::kill(pid, nix::sys::signal::SIGINT);
// Give the process time to die gracefully
if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), self.process).await
{
// Kill forcefully
nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL);
self.process.await;
}
}
Run Code Online (Sandbox Code Playgroud)
但是给出了这个错误:
error[E0382]: use of moved value: `self.process` …Run Code Online (Sandbox Code Playgroud) 我似乎无法让编译器让我包装 Tokio AsyncRead:
use std::io::Result;
use core::pin::Pin;
use core::task::{Context, Poll};
use tokio::io::AsyncRead;
struct Wrapper<T: AsyncRead>{
inner: T
}
impl<T: AsyncRead> AsyncRead for Wrapper<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<Result<usize>> {
self.inner.poll_read(cx, buf)
}
}
Run Code Online (Sandbox Code Playgroud)
这似乎应该编译,但编译器抱怨我没有包含正确的特征绑定,即使poll_read可以通过 AsyncRead: Playground Link获得
error[E0599]: no method named `poll_read` found for type parameter `T` in the current scope
--> src/lib.rs:17:20
|
17 | self.inner.poll_read(cx, buf)
| ^^^^^^^^^ method not found in `T`
|
= help: …Run Code Online (Sandbox Code Playgroud) 以下代码无法编译,因为编译器无法保证hashmap_of_lists其寿命足够长。我无法克服这一点。
我尝试过使用ArcandMutex但由于内部some_func使用的异步方式,我遇到了其他问题Mutex。
use futures; // 0.3.5
use std::collections::HashMap;
use tokio; // 0.2.21
async fn some_func(_some_slice: &mut [String]) {}
#[tokio::main]
async fn main() {
let mut hashmap_of_lists = HashMap::<String, Vec<String>>::new();
let mut join_handles = Vec::new();
for (_, value) in hashmap_of_lists.iter_mut() {
let future = some_func(value);
let join_handle = tokio::task::spawn(future);
join_handles.push(join_handle);
}
futures::future::join_all(join_handles).await;
}
Run Code Online (Sandbox Code Playgroud)
我收到这个错误
error[E0597]: `hashmap_of_lists` does not live long enough
--> src/main.rs:12:23
|
12 | for (_, value) in hashmap_of_lists.iter_mut() …Run Code Online (Sandbox Code Playgroud) 我有一个循环,我在其中做一些工作并使用 发送结果Sender。这项工作需要时间,如果失败我需要重试。当我重试时,接收器可能已关闭,我的重试将是浪费时间。因此,我需要一种方法来检查是否Receiver可用而不发送消息。
在理想的世界中,我希望我的代码在伪代码中看起来像这样:
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
// do som stuff with rx and drop it after some time
rx.recv(...).await;
});
let mut attempts = 0;
loop {
if tx.is_closed() {
break;
}
if let Ok(result) = do_work().await {
attempts = 0;
let _ = tx.send(result).await;
} else {
if attempts >= 10 {
break;
} else {
attempts += 1;
continue;
}
}
};
Run Code Online (Sandbox Code Playgroud)
问题是Sender没有is_closed方法。它确实有pub fn …
在tokio中,当处理器完成运行队列中的所有任务时,它们是首先在全局队列中查找更多任务,还是首先尝试从同级处理器中窃取工作?
我一直在寻找 tokio 源代码来获取问题的答案,我的印象是sleep 方法实际上放置了一个带有持续时间的计时器,但我认为我可能误解了代码,因为这样做效率非常低。是否可以更清楚地了解这一点?
下面的程序应该从多个线程定期打印,但它没有按我的预期工作:
# Cargo.toml
[dependencies]
tokio = { version = "0.3", features = ["full"] }
Run Code Online (Sandbox Code Playgroud)
use tokio::prelude::*; //0.3.4
use tokio::runtime::Builder;
use tokio::time::Duration;
fn main() {
let rt = Builder::new_multi_thread()
.enable_all()
.thread_stack_size(3 * 1024 * 1024)
.build()
.unwrap();
rt.block_on(async {
tokio::spawn(print_thread(1));
tokio::spawn(print_thread(2));
tokio::spawn(print_thread(3));
tokio::spawn(print_thread(4));
});
}
async fn print_thread(thread_num: usize) {
tokio::spawn(async move {
println!("thread{}-start", thread_num);
loop {
tokio::time::sleep(Duration::from_millis(1000)).await;
println!("thread{}-running", thread_num);
}
println!("thread{}-start", thread_num);
});
}
Run Code Online (Sandbox Code Playgroud)
运行这个时,我得到:
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.64s
Running `target/debug/time_test`
thread1-start
thread2-start …Run Code Online (Sandbox Code Playgroud) rust ×10
rust-tokio ×10
async-await ×3
asynchronous ×1
borrow ×1
future ×1
lifetime ×1
reference ×1