我tokio_core::net::TcpListener创建了一个,然后调用incoming方法以获取传入连接流。然后,我for_each在该流上使用该方法将其变为将来,并在事件循环上运行将来。一旦执行此操作,以后是否有任何方法可以解除与端口的绑定?
如果不是,Tokio中是否还有其他API可用于创建可以关闭的TCP服务器?
我有一个main函数,我在其中创建一个Tokio运行时并在其上运行两个期货.
use tokio;
fn main() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.spawn(MyMegaFutureNumberOne {});
runtime.spawn(MyMegaFutureNumberTwo {});
// Some code to 'join' them after receiving an OS signal
}
Run Code Online (Sandbox Code Playgroud)
我如何收到SIGTERM,等待所有未完成的任务NotReady并退出应用程序?
我想Delay以后再用做一些工作。如果我使用tokio::run,它就可以正常工作,但是在使用时会出现恐慌tokio::spawn:
use std::sync::mpsc;
use std::time::*;
use tokio::prelude::*; // 0.1.14
fn main() {
let (tx, rx) = mpsc::channel();
let task = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(1))
.map(move |_| {
tx.send(String::from("hello")).unwrap();
()
})
.map_err(|e| {
panic!("{:?}", e);
});
tokio::spawn(task);
let msg = rx.recv().unwrap();
println!("{}", msg);
}
Run Code Online (Sandbox Code Playgroud)
use std::sync::mpsc;
use std::time::*;
use tokio::prelude::*; // 0.1.14
fn main() {
let (tx, rx) = mpsc::channel();
let task = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(1))
.map(move |_| {
tx.send(String::from("hello")).unwrap();
()
})
.map_err(|e| {
panic!("{:?}", e); …Run Code Online (Sandbox Code Playgroud) 我有一个程序可以缓慢地生成数据(我们可以说它是计算密集型的,就像计算 pi 的数字一样)。它产生大量数据;每个响应可以是 1GiB,不适合内存,并且必须按需生成。我正在使用 hyper 编写一个 Web 服务来根据请求生成内容。
让我们跳过样板(service_fn, Server::bind)。
缓慢生成数据的 API 可能类似于
use std::io;
impl SlowData {
fn new(initial: &str) -> SlowData {
unimplemented!()
}
fn next_block(&self) -> io::Result<&[u8]> {
unimplemented!()
}
}
type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;
fn run(req: Request) -> ResponseFuture {
// spawn a thread and:
// initialize the generator
// SlowData::new(&req.uri().path());
// spawn a thread and call slow.next_block() until len()==0
// each …Run Code Online (Sandbox Code Playgroud) 我需要确保 TCP 连接的客户端通过特定的 (IP) 接口。标准方法是将bind()套接字连接到IP:0, 之前connect()。
我开始查看tokio::net::TcpStream::connect()和朋友,似乎没有办法做到这一点。我退后一步看了看std::net::TcpStream,里面也没有。
我是否遗漏了什么,或者我需要使用一些较低级别的 API?
我试图编译以下看似简单的代码,但出现错误:
use std::io::Error;
#[derive(Debug)]
struct NetworkConfig {
bind: String,
node_key_file: String,
}
async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
Ok(())
}
async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
let network_config_copy = network_config.clone();
tokio::spawn(async move {
network_handler(&network_config_copy).await
}).await?
}
Run Code Online (Sandbox Code Playgroud)
use std::io::Error;
#[derive(Debug)]
struct NetworkConfig {
bind: String,
node_key_file: String,
}
async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
Ok(())
}
async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
let network_config_copy = network_config.clone();
tokio::spawn(async move {
network_handler(&network_config_copy).await
}).await?
}
Run Code Online (Sandbox Code Playgroud)
从之前关于该主题的讨论和示例中,我了解到传递对 …
我最近开始学习 Rust,我不确定如何从应该返回 Result 的函数返回未来值。当我尝试仅返回响应变量并删除结果输出时,出现错误:无法在返回的函数中使用运算符?std::string::String
#[tokio::main]
async fn download() -> Result<(),reqwest::Error> {
let url = "https://query1.finance.yahoo.com/v8/finance/chart/TSLA";
let response = reqwest::get(url)
.await?
.text()
.await?;
Ok(response)
}
Run Code Online (Sandbox Code Playgroud)
我在 main() 中期望的是获取并打印响应值:
fn main() {
let response = download();
println!("{:?}", response)
}
Run Code Online (Sandbox Code Playgroud) 背景:
我有一个进程,用于tokio::process在 tokio 运行时生成带有句柄的子进程。
它还负责在杀死子进程后释放资源,并且根据文档(std::process::Child、tokio::process::Child),这需要父进程wait()(或await在 tokio 中)进行该进程。
SIGINT并非所有进程对 a或 a 的行为都相同SIGTERM,因此我想在发送 a 之前给孩子一些时间来死掉SIGKILL。
所需的解决方案:
pub async fn kill(self) {
// Close input
std::mem::drop(self.stdin);
// Send gracefull signal
let pid = nix::unistd::Pid::from_raw(self.process.id() as nix::libc::pid_t);
nix::sys::signal::kill(pid, nix::sys::signal::SIGINT);
// Give the process time to die gracefully
if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), self.process).await
{
// Kill forcefully
nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL);
self.process.await;
}
}
Run Code Online (Sandbox Code Playgroud)
但是给出了这个错误:
error[E0382]: use of moved value: `self.process` …Run Code Online (Sandbox Code Playgroud) 我似乎无法让编译器让我包装 Tokio AsyncRead:
use std::io::Result;
use core::pin::Pin;
use core::task::{Context, Poll};
use tokio::io::AsyncRead;
struct Wrapper<T: AsyncRead>{
inner: T
}
impl<T: AsyncRead> AsyncRead for Wrapper<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<Result<usize>> {
self.inner.poll_read(cx, buf)
}
}
Run Code Online (Sandbox Code Playgroud)
这似乎应该编译,但编译器抱怨我没有包含正确的特征绑定,即使poll_read可以通过 AsyncRead: Playground Link获得
error[E0599]: no method named `poll_read` found for type parameter `T` in the current scope
--> src/lib.rs:17:20
|
17 | self.inner.poll_read(cx, buf)
| ^^^^^^^^^ method not found in `T`
|
= help: …Run Code Online (Sandbox Code Playgroud) 以下代码无法编译,因为编译器无法保证hashmap_of_lists其寿命足够长。我无法克服这一点。
我尝试过使用ArcandMutex但由于内部some_func使用的异步方式,我遇到了其他问题Mutex。
use futures; // 0.3.5
use std::collections::HashMap;
use tokio; // 0.2.21
async fn some_func(_some_slice: &mut [String]) {}
#[tokio::main]
async fn main() {
let mut hashmap_of_lists = HashMap::<String, Vec<String>>::new();
let mut join_handles = Vec::new();
for (_, value) in hashmap_of_lists.iter_mut() {
let future = some_func(value);
let join_handle = tokio::task::spawn(future);
join_handles.push(join_handle);
}
futures::future::join_all(join_handles).await;
}
Run Code Online (Sandbox Code Playgroud)
我收到这个错误
error[E0597]: `hashmap_of_lists` does not live long enough
--> src/main.rs:12:23
|
12 | for (_, value) in hashmap_of_lists.iter_mut() …Run Code Online (Sandbox Code Playgroud) rust ×10
rust-tokio ×10
async-await ×2
asynchronous ×2
borrow ×1
future ×1
hyper ×1
lifetime ×1
reference ×1
shutdown ×1