标签: rust-tokio

是否可以在Tokio中关闭TcpListener?

tokio_core::net::TcpListener创建了一个,然后调用incoming方法以获取传入连接流。然后,我for_each在该流上使用该方法将其变为将来,并在事件循环上运行将来。一旦执行此操作,以后是否有任何方法可以解除与端口的绑定?

如果不是,Tokio中是否还有其他API可用于创建可以关闭的TCP服务器?

rust rust-tokio

5
推荐指数
2
解决办法
1080
查看次数

如何优雅地关闭Tokio运行时以响应SIGTERM?

我有一个main函数,我在其中创建一个Tokio运行时并在其上运行两个期货.

use tokio;

fn main() {
    let mut runtime = tokio::runtime::Runtime::new().unwrap();

    runtime.spawn(MyMegaFutureNumberOne {});
    runtime.spawn(MyMegaFutureNumberTwo {});

    // Some code to 'join' them after receiving an OS signal
}
Run Code Online (Sandbox Code Playgroud)

我如何收到SIGTERM,等待所有未完成的任务NotReady并退出应用程序?

asynchronous future shutdown rust rust-tokio

5
推荐指数
2
解决办法
902
查看次数

为什么调用tokio :: spawn会导致恐慌“ SpawnError {is_shutdown:true}”?

我想Delay以后再用做一些工作。如果我使用tokio::run,它就可以正常工作,但是在使用时会出现恐慌tokio::spawn

use std::sync::mpsc;
use std::time::*;

use tokio::prelude::*; // 0.1.14

fn main() {
    let (tx, rx) = mpsc::channel();
    let task = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(1))
        .map(move |_| {
            tx.send(String::from("hello")).unwrap();
            ()
        })
        .map_err(|e| {
            panic!("{:?}", e);
        });
    tokio::spawn(task);
    let msg = rx.recv().unwrap();
    println!("{}", msg);
}
Run Code Online (Sandbox Code Playgroud)
use std::sync::mpsc;
use std::time::*;

use tokio::prelude::*; // 0.1.14

fn main() {
    let (tx, rx) = mpsc::channel();
    let task = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(1))
        .map(move |_| {
            tx.send(String::from("hello")).unwrap();
            ()
        })
        .map_err(|e| {
            panic!("{:?}", e); …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
1
解决办法
805
查看次数

如何从产生数据块的慢速处理侧线程流式传输超级请求的正文?

我有一个程序可以缓慢地生成数据(我们可以说它是计算密集型的,就像计算 pi 的数字一样)。它产生大量数据;每个响应可以是 1GiB,不适合内存,并且必须按需生成。我正在使用 hyper 编写一个 Web 服务来根据请求生成内容。

让我们跳过样板(service_fn, Server::bind)。

缓慢生成数据的 API 可能类似于

use std::io;

impl SlowData {
    fn new(initial: &str) -> SlowData {
        unimplemented!()
    }

    fn next_block(&self) -> io::Result<&[u8]> {
        unimplemented!()
    }
}

type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;

fn run(req: Request) -> ResponseFuture {
    // spawn a thread and:
    // initialize the generator
    // SlowData::new(&req.uri().path());

    // spawn a thread and call slow.next_block() until len()==0
    // each …
Run Code Online (Sandbox Code Playgroud)

rust hyper rust-tokio

5
推荐指数
1
解决办法
2622
查看次数

如何在 rust/tokio 中的 TCP 客户端上绑定()?

我需要确保 TCP 连接的客户端通过特定的 (IP) 接口。标准方法是将bind()套接字连接到IP:0, 之前connect()

我开始查看tokio::net::TcpStream::connect()和朋友,似乎没有办法做到这一点。我退后一步看了看std::net::TcpStream,里面也没有。

我是否遗漏了什么,或者我需要使用一些较低级别的 API?

rust rust-tokio

5
推荐指数
1
解决办法
2689
查看次数

为什么 tokio::spawn 即使使用 .clone() 也会抱怨生命周期?

我试图编译以下看似简单的代码,但出现错误:

use std::io::Error;

#[derive(Debug)]
struct NetworkConfig {
    bind: String,
    node_key_file: String,
}

async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
    Ok(())
}

async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
    let network_config_copy = network_config.clone();
    tokio::spawn(async move {
        network_handler(&network_config_copy).await
    }).await?
}
Run Code Online (Sandbox Code Playgroud)
use std::io::Error;

#[derive(Debug)]
struct NetworkConfig {
    bind: String,
    node_key_file: String,
}

async fn network_handler(network_config: &NetworkConfig) -> Result<(), Error> {
    Ok(())
}

async fn run(network_config: &NetworkConfig) -> Result<(), Error> {
    let network_config_copy = network_config.clone();
    tokio::spawn(async move {
        network_handler(&network_config_copy).await
    }).await?
}
Run Code Online (Sandbox Code Playgroud)

从之前关于该主题的讨论和示例中,我了解到传递对 …

lifetime rust async-await rust-tokio

5
推荐指数
1
解决办法
2925
查看次数

从函数返回未来值

我最近开始学习 Rust,我不确定如何从应该返回 Result 的函数返回未来值。当我尝试仅返回响应变量并删除结果输出时,出现错误:无法在返回的函数中使用运算符?std::string::String

#[tokio::main]
async fn download() -> Result<(),reqwest::Error> {
    let url = "https://query1.finance.yahoo.com/v8/finance/chart/TSLA";
    let response = reqwest::get(url)
                            .await?
                            .text()
                            .await?;
    Ok(response)
 }
Run Code Online (Sandbox Code Playgroud)

我在 main() 中期望的是获取并打印响应值:

fn main() {
    let response = download();
    println!("{:?}", response)
}
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
1
解决办法
8484
查看次数

tokio::time::timeout 之后再次等待 future

背景:
我有一个进程,用于tokio::process在 tokio 运行时生成带有句柄的子进程。

它还负责在杀死子进程后释放资源,并且根据文档(s​​td::process::Childtokio::process::Child),这需要父进程wait()(或await在 tokio 中)进行该进程。

SIGINT并非所有进程对 a或 a 的行为都相同SIGTERM,因此我想在发送 a 之前给孩子一些时间来死掉SIGKILL

所需的解决方案:

    pub async fn kill(self) {
        // Close input
        std::mem::drop(self.stdin);

        // Send gracefull signal
        let pid = nix::unistd::Pid::from_raw(self.process.id() as nix::libc::pid_t);
        nix::sys::signal::kill(pid, nix::sys::signal::SIGINT);

        // Give the process time to die gracefully
        if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), self.process).await
        {
            // Kill forcefully
            nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL);
            self.process.await;
        }
    }
Run Code Online (Sandbox Code Playgroud)

但是给出了这个错误:

error[E0382]: use of moved value: `self.process` …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio

5
推荐指数
1
解决办法
7135
查看次数

包装 AsyncRead

我似乎无法让编译器让我包装 Tokio AsyncRead:

use std::io::Result;
use core::pin::Pin;
use core::task::{Context, Poll};

use tokio::io::AsyncRead;

struct Wrapper<T: AsyncRead>{
    inner: T
}

impl<T: AsyncRead> AsyncRead for Wrapper<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8]
    ) -> Poll<Result<usize>> {
        self.inner.poll_read(cx, buf)
    }
}
Run Code Online (Sandbox Code Playgroud)

这似乎应该编译,但编译器抱怨我没有包含正确的特征绑定,即使poll_read可以通过 AsyncRead: Playground Link获得

error[E0599]: no method named `poll_read` found for type parameter `T` in the current scope
  --> src/lib.rs:17:20
   |
17 |         self.inner.poll_read(cx, buf)
   |                    ^^^^^^^^^ method not found in `T`
   |
   = help: …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

5
推荐指数
1
解决办法
1204
查看次数

当使用 tokio::spawn 和带有可变引用的 future 时,“借用的值存在的时间不够长”

以下代码无法编译,因为编译器无法保证hashmap_of_lists其寿命足够长。我无法克服这一点。

我尝试过使用ArcandMutex但由于内部some_func使用的异步方式,我遇到了其他问题Mutex

use futures; // 0.3.5
use std::collections::HashMap;
use tokio; // 0.2.21

async fn some_func(_some_slice: &mut [String]) {}

#[tokio::main]
async fn main() {
    let mut hashmap_of_lists = HashMap::<String, Vec<String>>::new();
    let mut join_handles = Vec::new();

    for (_, value) in hashmap_of_lists.iter_mut() {
        let future = some_func(value);
        let join_handle = tokio::task::spawn(future);
        join_handles.push(join_handle);
    }

    futures::future::join_all(join_handles).await;
}
Run Code Online (Sandbox Code Playgroud)

我收到这个错误

error[E0597]: `hashmap_of_lists` does not live long enough
  --> src/main.rs:12:23
   |
12 |     for (_, value) in hashmap_of_lists.iter_mut() …
Run Code Online (Sandbox Code Playgroud)

asynchronous reference rust rust-tokio borrow

5
推荐指数
0
解决办法
2070
查看次数