我需要确保 TCP 连接的客户端通过特定的 (IP) 接口。标准方法是将bind()套接字连接到IP:0, 之前connect()。
我开始查看tokio::net::TcpStream::connect()和朋友,似乎没有办法做到这一点。我退后一步看了看std::net::TcpStream,里面也没有。
我是否遗漏了什么,或者我需要使用一些较低级别的 API?
我试图编译以下看似简单的代码,但出现错误:
use std::io::Error;
#[derive(Debug)]
struct NetworkConfig {
bind: String,
node_key_file: String,
}
async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
Ok(())
}
async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
let network_config_copy = network_config.clone();
tokio::spawn(async move {
network_handler(&network_config_copy).await
}).await?
}
Run Code Online (Sandbox Code Playgroud)
use std::io::Error;
#[derive(Debug)]
struct NetworkConfig {
bind: String,
node_key_file: String,
}
async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
Ok(())
}
async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
let network_config_copy = network_config.clone();
tokio::spawn(async move {
network_handler(&network_config_copy).await
}).await?
}
Run Code Online (Sandbox Code Playgroud)
从之前关于该主题的讨论和示例中,我了解到传递对 …
我最近开始学习 Rust,我不确定如何从应该返回 Result 的函数返回未来值。当我尝试仅返回响应变量并删除结果输出时,出现错误:无法在返回的函数中使用运算符?std::string::String
#[tokio::main]
async fn download() -> Result<(),reqwest::Error> {
let url = "https://query1.finance.yahoo.com/v8/finance/chart/TSLA";
let response = reqwest::get(url)
.await?
.text()
.await?;
Ok(response)
}
Run Code Online (Sandbox Code Playgroud)
我在 main() 中期望的是获取并打印响应值:
fn main() {
let response = download();
println!("{:?}", response)
}
Run Code Online (Sandbox Code Playgroud) 下面的程序应该从多个线程定期打印,但它没有按我的预期工作:
# Cargo.toml
[dependencies]
tokio = { version = "0.3", features = ["full"] }
Run Code Online (Sandbox Code Playgroud)
use tokio::prelude::*; //0.3.4
use tokio::runtime::Builder;
use tokio::time::Duration;
fn main() {
let rt = Builder::new_multi_thread()
.enable_all()
.thread_stack_size(3 * 1024 * 1024)
.build()
.unwrap();
rt.block_on(async {
tokio::spawn(print_thread(1));
tokio::spawn(print_thread(2));
tokio::spawn(print_thread(3));
tokio::spawn(print_thread(4));
});
}
async fn print_thread(thread_num: usize) {
tokio::spawn(async move {
println!("thread{}-start", thread_num);
loop {
tokio::time::sleep(Duration::from_millis(1000)).await;
println!("thread{}-running", thread_num);
}
println!("thread{}-start", thread_num);
});
}
Run Code Online (Sandbox Code Playgroud)
运行这个时,我得到:
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.64s
Running `target/debug/time_test`
thread1-start
thread2-start …Run Code Online (Sandbox Code Playgroud) 我正在开发一个绑定到 Tokio 套接字并管理 TCP 连接的代码库。在生产中,它绑定到AF_VSOCK使用tokio-vsock板条箱。
在 Mac 上本地开发时,AF_VSOCKAPI 不可用,因为没有hypervisor -> VM连接 \xe2\x80\x94\xc2\xa0it\ 只是使用cargo run.
在本地运行时,我一直在创建一个标准tokio::net::TcpListener结构,在生产中我一直在创建一个tokio_vsock::VsockListener. 这两种结构大多可以互换并公开相同的方法。无论使用哪个结构,其余代码都可以完美运行。
到目前为止,我只是保留了这两个结构,并简单地注释掉了本地不需要的结构 \xe2\x80\x94 这显然不是“好的做法”。我的代码如下:
\n#[tokio::main]\nasync fn main() -> Result<(), ()> {\n // Production AF_VSOCK listener (comment out locally)\n let mut listener = tokio_vsock::VsockListener::bind(\n &SockAddr::Vsock(\n VsockAddr::new(\n VMADDR_CID_ANY,\n LISTEN_PORT,\n )\n )\n )\n .expect("Unable to bind AF_VSOCK listener");\n\n // Local TCP listener (comment out in production)\n let mut …Run Code Online (Sandbox Code Playgroud) 我知道 tokio 允许编写并发代码。但我不确定它是否并行运行。我的电脑有八个核心。所以理想情况下我运行的线程不超过八个。如果我需要更多并发性,我会在这些线程之上运行协程(使用 tokio)。
当然,除非 tokio 已经是多线程的了。在这种情况下,一开始就创建这八个线程将会适得其反。所以我想问的是,tokio 默认情况下是多线程的,还是我应该自己实现?
我的代码如下所示:
let fetches = futures::stream::iter(
hosts.into_iter().map(|url| {
async move {
match reqwest::get(&url).await {
// Ok and Err statements here!
}
Run Code Online (Sandbox Code Playgroud)
但是,这里的问题是,对于具有无效或自签名 SSL 证书的 URL,它会给出错误。所以,我尝试执行以下操作:
let fetches = futures::stream::iter(
hosts.into_iter().map(|url| {
async move {
match reqwest::Client::builder().danger_accept_invalid_certs(true).build().unwrap().get(&url).await {
// Ok and Err statements here!
}
Run Code Online (Sandbox Code Playgroud)
当我尝试使用 Cargo 构建它时,它显示“错误[E0277]:`RequestBuilder`不是未来”。
那么,如何让我的代码接受无效证书呢?
我尝试使用指示板条箱来显示子任务的多个进度条以及一个计算所有已完成任务的进度条。这是我的代码:
# Cargo.toml
[dependencies]
indicatif = "0.15.0"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
Run Code Online (Sandbox Code Playgroud)
//! main.rs
use std::time::Duration;
use futures::{StreamExt, stream::futures_unordered::FuturesUnordered};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let m = MultiProgress::new();
let sty = ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}")
.progress_chars("##-");
let total_pb = m.add(ProgressBar::new(3));
total_pb.set_style(sty.clone());
let mut futs = FuturesUnordered::new();
let durations = [15u64, 8, 3];
let mut pb_cnt = 0usize;
for &duration in durations.iter() …Run Code Online (Sandbox Code Playgroud) 我正在尝试构建一个对象,该对象可以管理来自 websocket 的提要,但能够在多个提要之间切换。
有一个Feed特点:
trait Feed {
async fn start(&mut self);
async fn stop(&mut self);
}
Run Code Online (Sandbox Code Playgroud)
共有三个结构体实现Feed:A、B和C。
当start被调用时,它会启动一个无限循环,监听来自 websocket 的消息并处理每条传入的消息。
我想实现一个FeedManager维护单个活动源但可以接收命令来切换它正在使用的源的命令。
enum FeedCommand {
Start(String),
Stop,
}
struct FeedManager {
active_feed_handle: tokio::task::JoinHandle,
controller: mpsc::Receiver<FeedCommand>,
}
impl FeedManager {
async fn start(&self) {
while let Some(command) = self.controller.recv().await {
match command {
FeedCommand::Start(feed_type) => {
// somehow tell the active feed to stop (need channel probably) or …Run Code Online (Sandbox Code Playgroud) 我想利用 Tokio 的运行时来处理可变数量的异步 future。由于 futures 的数量在编译时是未知的,似乎FuturesUnordered是我最好的选择(宏,例如select!需要在编译时指定您的分支;join_all可能是可能的,但文档建议“在很多情况下”当 order 不可用时使用 FuturesUnordered )没关系)。
这段代码的逻辑是一个recv()循环被推送到futures桶中,它应该始终运行。当新数据到达时,它的解析/处理也被推送到 futures 桶(而不是立即处理)。这确保接收器在响应新事件时保持低延迟,并且数据处理(可能需要大量计算的解密)与所有其他数据处理异步块(加上侦听接收器)同时发生。
.boxed()顺便说一句,这个帖子解释了为什么期货会变得。
问题是这个神秘的错误:
错误[E0277] :`dyn futures::Future<Output = ()> + std::marker::Send` 无法在线程之间安全共享
--> src/main.rs:27:8
|
27 | 27 }).boxed());
| ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` 无法在线程之间安全共享
|
= help : `dyn futures::Future<Output = ()> + std::marker::Send` 未实现 `Sync` 特性
=注意:需要,因为 `Sync` 的 impl 要求Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
= note:必需的,因为它出现在类型 `Box<dyn futures::Future<Output = ()> … rust ×10
rust-tokio ×10
async-await ×5
rust-cargo ×2
compilation ×1
concurrency ×1
lifetime ×1
reqwest ×1