我有一个需要测试的异步函数。该函数使用一个mongodb::Database
对象来运行,因此我在函数中初始化连接setup()
并使用tokio_test::block_on()
将表达式包装await
在其中。
#[cfg(test)]
mod tests {
use mongodb::{options::ClientOptions, Client};
use tokio_test;
async fn setup() -> mongodb::Database {
tokio_test::block_on(async {
let client_uri = "mongodb://127.0.0.1:27017";
let options = ClientOptions::parse(&client_uri).await;
let client_result = Client::with_options(options.unwrap());
let client = client_result.unwrap();
client.database("my_database")
})
}
#[test]
fn test_something_async() {
// for some reason, test cannot be async
let DB = setup(); // <- the DB is impl std::future::Future type
// the DB variable will be used to run another
// …
Run Code Online (Sandbox Code Playgroud) 假设我想用 Tokio 同时下载两个网页......
我可以用以下方法来实现tokio::spawn()
:
async fn v1() {
let t1 = tokio::spawn(reqwest::get("https://example.com"));
let t2 = tokio::spawn(reqwest::get("https://example.org"));
let (r1, r2) = (t1.await.unwrap(), t2.await.unwrap());
println!("example.com = {}", r1.unwrap().status());
println!("example.org = {}", r2.unwrap().status());
}
Run Code Online (Sandbox Code Playgroud)
或者我可以通过以下方式实现tokio::join!()
:
async fn v2() {
let t1 = reqwest::get("https://example.com");
let t2 = reqwest::get("https://example.org");
let (r1, r2) = tokio::join!(t1, t2);
println!("example.com = {}", r1.unwrap().status());
println!("example.org = {}", r2.unwrap().status());
}
Run Code Online (Sandbox Code Playgroud)
在这两种情况下,两个请求都是同时发生的。但是,在第二种情况下,两个请求在同一任务中运行,因此在同一线程上运行。
所以,我的问题是:
tokio::join!()
超过有优势吗tokio::spawn()
?我猜生成一个新任务的开销非常小,但真的是这样吗?
我正在使用 Tokio 1.1 来做异步事情。我有一个async
main
with #[tokio::main]
,所以我已经在使用运行时进行操作。
main
调用一个非异步方法,我希望await
在未来(具体来说,我正在从数据融合数据帧收集)。这种非异步方法具有由特征规定的签名,该特征返回结构体,而不是Future<Struct>
. 据我所知,我无法将其标记为异步。
如果我尝试打电话df.collect().await;
,我会得到
只允许在
async
函数和块内部
来自编译器的错误,指出我await
在其中调用的方法不是async
.
如果我尝试block_on
从一个新的运行时开始,未来是这样的:
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(df.collect());
Run Code Online (Sandbox Code Playgroud)
我遇到运行时恐慌:
无法从运行时内部启动运行时。发生这种情况是因为函数(如
block_on
)试图在当前线程用于驱动异步任务时阻止该线程。
如果我尝试futures::executor::block_on(df.collect()).unwrap();
,我会遇到新的运行时恐慌:
“当前未在 Tokio 0.2.x 运行时上运行。”
这很奇怪,因为我使用的是 Tokio v1.1。
这感觉比应该的更难。我处于异步上下文中,感觉编译器应该知道这一点并允许我.await
从方法内调用 - 唯一的代码路径从块内调用此方法async
。有没有一种简单的方法可以做到我所缺少的?
我面临着一种情况,我需要从对象的放置处理程序运行异步代码。整个应用程序在 tokio 异步上下文中运行,因此我知道 drop 处理程序是使用活动的 tokio 运行时调用的,但不幸的是 drop 本身是一个同步函数。
理想情况下,我想要一个适用于多线程和当前线程运行时的解决方案,但如果不存在,那么我可以使用阻止删除线程并依赖其他线程来驱动的解决方案期货。
我考虑了多种选择,但我不确定哪种方法最好,也不了解它们的权衡。对于这些示例,我们假设我的类有一个async terminate(&mut self)
我希望从 调用的函数drop()
。
struct MyClass;
impl MyClass {
async fn terminate(&mut self) {}
}
Run Code Online (Sandbox Code Playgroud)
选项1:tokio::runtime::Handle::block_on
impl Drop for MyClass {
fn drop(&mut self) {
tokio::runtime::Handle::current().block_on(self.terminate());
}
}
Run Code Online (Sandbox Code Playgroud)
这似乎是最直接的方法,但不幸的是它会引起恐慌
Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
Run Code Online (Sandbox Code Playgroud)
看操场
我对此有点困惑,因为我认为Handle::block_on
会使用当前正在运行的运行时,但它似乎试图启动一个新的运行时?这里发生了什么? …
我无法创建尝试连接到服务器的客户端,并且:
这是连接到服务器的代码; 当前连接丢失时,程序退出.我不确定实现它的最佳方法是什么; 也许我必须创建一个Future
无限循环?
extern crate tokio_line;
use tokio_line::LineCodec;
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.map(|_| {
println!("CLIENT DISCONNECTED");
()
}).map_err(|err| err)
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle(); …
Run Code Online (Sandbox Code Playgroud) 在 Node.js 中,我可以设置触发某个事件的时间间隔,
function intervalFunc() {
console.log('whelp, triggered again!');
}
setInterval(intervalFunc, 1500);
Run Code Online (Sandbox Code Playgroud)
然而,Tokio间隔的界面有点复杂。这似乎与间隔的更字面定义有关,而不是按间隔调用函数,它只是停止线程直到时间过去(使用.await
)。
Tokio 中是否有一个原语“每 x 秒”调用一个函数或类似函数?如果没有,是否有一个习语可以做到这一点?
我只需要定期运行一个函数...我也不关心其他线程。它只是 Tokio 事件循环中的一个函数。
与“普通”互斥体相比,什么是“异步”互斥体?Mutex
我相信这是 tokio和普通 std lib之间的区别Mutex
。但从概念上讲,我不明白互斥体如何“异步”。难道不是一次只有一件事可以使用它吗?
我正在使用关于他们的新板条箱的弹性搜索博客文章中的示例代码,但我无法让它按预期工作。线程恐慌thread 'main' panicked at 'not currently running on the Tokio runtime.'
。
什么是 Tokio 运行时,如何配置它,为什么必须配置?
use futures::executor::block_on;
async elastic_search_example() -> Result<(), Box<dyn Error>> {
let index_response = client
.index(IndexParts::IndexId("tweets", "1"))
.body(json!({
"user": "kimchy",
"post_date": "2009-11-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
}))
.refresh(Refresh::WaitFor)
.send()
.await?;
if !index_response.status_code().is_success() {
panic!("indexing document failed")
}
let index_response = client
.index(IndexParts::IndexId("tweets", "2"))
.body(json!({
"user": "forloop",
"post_date": "2020-01-08T00:00:00Z",
"message": "Indexing with the rust client, yeah!"
}))
.refresh(Refresh::WaitFor)
.send()
.await?;
if !index_response.status_code().is_success() { …
Run Code Online (Sandbox Code Playgroud) 我正在关注mdns Rust 文档并粘贴了示例代码,但它引发了以下错误:
thread 'main' panicked at 'there is no reactor running, must be called from the context of Tokio runtime'
Run Code Online (Sandbox Code Playgroud)
这是我拥有的代码:
use futures_util::{pin_mut, stream::StreamExt};
use mdns::{Error, Record, RecordKind};
use std::{net::IpAddr, time::Duration};
const SERVICE_NAME: &'static str = "_googlecast._tcp.local";
#[tokio::main]
async fn main() -> Result<(), Error> {
// Iterate through responses from each Cast device, asking for new devices every 15s
let stream = mdns::discover::all(SERVICE_NAME, Duration::from_secs(15))?.listen();
pin_mut!(stream);
while let Some(Ok(response)) = stream.next().await {
let addr = response.records().filter_map(self::to_ip_addr).next();
if let …
Run Code Online (Sandbox Code Playgroud) 我不知道下一步该做什么。看起来我误解了一些东西,或者我可能没有学到一些关键的话题。
use std::sync::Arc;
use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0
pub struct Task {
pub id: u32,
pub url: String,
}
pub enum Message {
Failure(Task, Error),
Success(Task, Response),
}
struct State {
client: reqwest::Client,
res_tx: Sender<Message>,
res_rx: Receiver<Message>,
}
pub struct Proxy {
state: Arc<State>,
max_rps: u16,
max_pending: u16,
id: u32,
parent_tx: Sender<String>,
}
async fn send_msg<T>(tx: &Sender<T>, msg: T) {
match tx.send(msg).await {
Err(error) => {
eprintln!("{}", error)
}
_ => (),
};
} …
Run Code Online (Sandbox Code Playgroud) rust ×10
rust-tokio ×10
async-await ×2
asynchronous ×1
future ×1
intervals ×1
loops ×1
mutex ×1
rust-futures ×1