标签: rust-tokio

如何测试使用 Tokio 的异步函数?

我有一个需要测试的异步函数。该函数使用一个mongodb::Database对象来运行,因此我在函数中初始化连接setup()并使用tokio_test::block_on()将表达式包装await在其中。

#[cfg(test)]
mod tests {
    use mongodb::{options::ClientOptions, Client};
    use tokio_test;

    async fn setup() -> mongodb::Database {
        tokio_test::block_on(async {
            let client_uri = "mongodb://127.0.0.1:27017";
            let options = ClientOptions::parse(&client_uri).await;
            let client_result = Client::with_options(options.unwrap());
            let client = client_result.unwrap();
            client.database("my_database")
        })
    }

    #[test]
    fn test_something_async() {
        // for some reason, test cannot be async
        let DB = setup(); // <- the DB is impl std::future::Future type

        // the DB variable will be used to run another
        // …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

77
推荐指数
1
解决办法
3万
查看次数

什么时候应该使用 tokio::join!() 而不是 tokio::spawn()?

假设我想用 Tokio 同时下载两个网页......

我可以用以下方法来实现tokio::spawn()

async fn v1() {
    let t1 = tokio::spawn(reqwest::get("https://example.com"));
    let t2 = tokio::spawn(reqwest::get("https://example.org"));
    let (r1, r2) = (t1.await.unwrap(), t2.await.unwrap());
    println!("example.com = {}", r1.unwrap().status());
    println!("example.org = {}", r2.unwrap().status());
}
Run Code Online (Sandbox Code Playgroud)

或者我可以通过以下方式实现tokio::join!()

async fn v2() {
    let t1 = reqwest::get("https://example.com");
    let t2 = reqwest::get("https://example.org");
    let (r1, r2) = tokio::join!(t1, t2);
    println!("example.com = {}", r1.unwrap().status());
    println!("example.org = {}", r2.unwrap().status());
}
Run Code Online (Sandbox Code Playgroud)

在这两种情况下,两个请求都是同时发生的。但是,在第二种情况下,两个请求在同一任务中运行,因此在同一线程上运行。

所以,我的问题是:

  • tokio::join!()超过有优势吗tokio::spawn()
  • 如果是的话,在什么场景下?(它与下载网页无关)

我猜生成一个新任务的开销非常小,但真的是这样吗?

rust rust-tokio

27
推荐指数
2
解决办法
2万
查看次数

如何在 Tokio 运行时上下文中从异步方法调用的非异步方法中等待 future?

我正在使用 Tokio 1.1 来做异步事情。我有一个async mainwith #[tokio::main],所以我已经在使用运行时进行操作。

main调用一个非异步方法,我希望await在未来(具体来说,我正在从数据融合数据帧收集)。这种非异步方法具有由特征规定的签名,该特征返回结构体,而不是Future<Struct>. 据我所知,我无法将其标记为异步。

如果我尝试打电话df.collect().await;,我会得到

只允许在async函数和块内部

来自编译器的错误,指出我await在其中调用的方法不是async.

如果我尝试block_on从一个新的运行时开始,未来是这样的:

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());
Run Code Online (Sandbox Code Playgroud)

我遇到运行时恐慌:

无法从运行时内部启动运行时。发生这种情况是因为函数(如block_on)试图在当前线程用于驱动异步任务时阻止该线程。

如果我尝试futures::executor::block_on(df.collect()).unwrap();,我会遇到新的运行时恐慌:

“当前未在 Tokio 0.2.x 运行时上运行。”

这很奇怪,因为我使用的是 Tokio v1.1。

这感觉比应该的更难。我处于异步上下文中,感觉编译器应该知道这一点并允许我.await从方法内调用 - 唯一的代码路径从块内调用此方法async。有没有一种简单的方法可以做到我所缺少的?

future rust async-await rust-tokio

24
推荐指数
1
解决办法
2万
查看次数

Rust 异步删除

我面临着一种情况,我需要从对象的放置处理程序运行异步代码。整个应用程序在 tokio 异步上下文中运行,因此我知道 drop 处理程序是使用活动的 tokio 运行时调用的,但不幸的是 drop 本身是一个同步函数。

理想情况下,我想要一个适用于多线程和当前线程运行时的解决方案,但如果不存在,那么我可以使用阻止删除线程并依赖其他线程来驱动的解决方案期货。

我考虑了多种选择,但我不确定哪种方法最好,也不了解它们的权衡。对于这些示例,我们假设我的类有一个async terminate(&mut self)我希望从 调用的函数drop()

struct MyClass;
impl MyClass {
    async fn terminate(&mut self) {}
}
Run Code Online (Sandbox Code Playgroud)

选项1:tokio::runtime::Handle::block_on

impl Drop for MyClass {
    fn drop(&mut self) {
        tokio::runtime::Handle::current().block_on(self.terminate());
    }
}
Run Code Online (Sandbox Code Playgroud)

这似乎是最直接的方法,但不幸的是它会引起恐慌

Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
Run Code Online (Sandbox Code Playgroud)

操场

我对此有点困惑,因为我认为Handle::block_on会使用当前正在运行的运行时,但它似乎试图启动一个新的运行时?这里发生了什么? …

asynchronous rust async-await rust-tokio rust-futures

23
推荐指数
1
解决办法
4688
查看次数

在无限循环中异步重新连接客户端到服务器

我无法创建尝试连接到服务器的客户端,并且:

  • 如果服务器关闭,它必须在无限循环中再次尝试
  • 如果服务器已启动并且连接成功,则当连接丢失时(即服务器断开客户端连接),客户端必须重新启动无限循环以尝试连接到服务器

这是连接到服务器的代码; 当前连接丢失时,程序退出.我不确定实现它的最佳方法是什么; 也许我必须创建一个Future无限循环?

extern crate tokio_line;
use tokio_line::LineCodec;

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {                                                                                                                                   
    let remote_addr = "127.0.0.1:9876".parse().unwrap();                                                                                                                                                            
    let tcp = TcpStream::connect(&remote_addr, handle);                                                                                                                                                             

    let client = tcp.and_then(|stream| {                                                                                                                                                                            
        let (sink, from_server) = stream.framed(LineCodec).split();                                                                                                                                                 
        let reader = from_server.for_each(|message| {                                                                                                                                                               
            println!("{}", message);                                                                                                                                                                                
            Ok(())                                                                                                                                                                                                  
        });                                                                                                                                                                                                         

        reader.map(|_| {                                                                                                                                                                                            
            println!("CLIENT DISCONNECTED");                                                                                                                                                                        
            ()                                                                                                                                                                                                      
        }).map_err(|err| err)                                                                                                                                                                                       
    });                                                                                                                                                                                                             

    let client = client.map_err(|_| { panic!()});                                                                                                                                                                   
    Box::new(client)                                                                                                                                                                                                
}                                                                                                                                                                                                                   

fn main() {                                                                                                                                                                                                         
    let mut core = Core::new().unwrap();                                                                                                                                                                            
    let handle = core.handle(); …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

21
推荐指数
1
解决办法
2603
查看次数

如何使用 Tokio 在每个周期或间隔(以秒为单位)触发函数?

在 Node.js 中,我可以设置触发某个事件的时间间隔,

function intervalFunc() {
  console.log('whelp, triggered again!');
}

setInterval(intervalFunc, 1500);
Run Code Online (Sandbox Code Playgroud)

然而,Tokio间隔的界面有点复杂。这似乎与间隔的更字面定义有关,而不是按间隔调用函数,它只是停止线程直到时间过去(使用.await)。

Tokio 中是否有一个原语“每 x 秒”调用一个函数或类似函数?如果没有,是否有一个习语可以做到这一点?

我只需要定期运行一个函数...我也不关心其他线程。它只是 Tokio 事件循环中的一个函数。

loops intervals rust rust-tokio

20
推荐指数
1
解决办法
1万
查看次数

std::sync::Mutex 与 tokio::sync::Mutex 有什么区别?

与“普通”互斥体相比,什么是“异步”互斥体?Mutex我相信这是 tokio和普通 std lib之间的区别Mutex。但从概念上讲,我不明白互斥体如何“异步”。难道不是一次只有一件事可以使用它吗?

mutex rust rust-tokio

19
推荐指数
1
解决办法
6906
查看次数

使用期货箱中的 block_on 时,为什么我会“对'当前未在 Tokio 运行时上运行'感到恐慌”?

我正在使用关于他们的新板条箱的弹性搜索博客文章中的示例代码,但我无法让它按预期工作。线程恐慌thread 'main' panicked at 'not currently running on the Tokio runtime.'

什么是 Tokio 运行时,如何配置它,为什么必须配置?

use futures::executor::block_on;

async elastic_search_example() -> Result<(), Box<dyn Error>> {
    let index_response = client
        .index(IndexParts::IndexId("tweets", "1"))
        .body(json!({
            "user": "kimchy",
            "post_date": "2009-11-15T00:00:00Z",
            "message": "Trying out Elasticsearch, so far so good?"
        }))
        .refresh(Refresh::WaitFor)
        .send()
        .await?;
    if !index_response.status_code().is_success() {
        panic!("indexing document failed")
    }
    let index_response = client
        .index(IndexParts::IndexId("tweets", "2"))
        .body(json!({
            "user": "forloop",
            "post_date": "2020-01-08T00:00:00Z",
            "message": "Indexing with the rust client, yeah!"
        }))
        .refresh(Refresh::WaitFor)
        .send()
        .await?;
    if !index_response.status_code().is_success() { …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

18
推荐指数
2
解决办法
8590
查看次数

为什么我收到错误“没有反应器正在运行,必须从 Tokio 运行时的上下文中调用”,即使我有 #[tokio::main]?

我正在关注mdns Rust 文档并粘贴了示例代码,但它引发了以下错误:

thread 'main' panicked at 'there is no reactor running, must be called from the context of Tokio runtime'
Run Code Online (Sandbox Code Playgroud)

这是我拥有的代码:

use futures_util::{pin_mut, stream::StreamExt};
use mdns::{Error, Record, RecordKind};
use std::{net::IpAddr, time::Duration};

const SERVICE_NAME: &'static str = "_googlecast._tcp.local";

#[tokio::main]
async fn main() -> Result<(), Error> {
    // Iterate through responses from each Cast device, asking for new devices every 15s
    let stream = mdns::discover::all(SERVICE_NAME, Duration::from_secs(15))?.listen();
    pin_mut!(stream);

    while let Some(Ok(response)) = stream.next().await {
        let addr = response.records().filter_map(self::to_ip_addr).next();

        if let …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

18
推荐指数
3
解决办法
9714
查看次数

无法借用“Arc”中的可变数据

我不知道下一步该做什么。看起来我误解了一些东西,或者我可能没有学到一些关键的话题。

use std::sync::Arc;

use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0

pub struct Task {
    pub id: u32,
    pub url: String,
}
pub enum Message {
    Failure(Task, Error),
    Success(Task, Response),
}

struct State {
    client: reqwest::Client,
    res_tx: Sender<Message>,
    res_rx: Receiver<Message>,
}

pub struct Proxy {
    state: Arc<State>,
    max_rps: u16,
    max_pending: u16,
    id: u32,
    parent_tx: Sender<String>,
}

async fn send_msg<T>(tx: &Sender<T>, msg: T) {
    match tx.send(msg).await {
        Err(error) => {
            eprintln!("{}", error)
        }
        _ => (),
    };
} …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

18
推荐指数
2
解决办法
3万
查看次数