我正在使用 Tokio 1.1 来做异步事情。我有一个async mainwith #[tokio::main],所以我已经在使用运行时进行操作。
main调用一个非异步方法,我希望await在未来(具体来说,我正在从数据融合数据帧收集)。这种非异步方法具有由特征规定的签名,该特征返回结构体,而不是Future<Struct>. 据我所知,我无法将其标记为异步。
如果我尝试打电话df.collect().await;,我会得到
只允许在
async函数和块内部
来自编译器的错误,指出我await在其中调用的方法不是async.
如果我尝试block_on从一个新的运行时开始,未来是这样的:
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(df.collect());
Run Code Online (Sandbox Code Playgroud)
我遇到运行时恐慌:
无法从运行时内部启动运行时。发生这种情况是因为函数(如
block_on)试图在当前线程用于驱动异步任务时阻止该线程。
如果我尝试futures::executor::block_on(df.collect()).unwrap();,我会遇到新的运行时恐慌:
“当前未在 Tokio 0.2.x 运行时上运行。”
这很奇怪,因为我使用的是 Tokio v1.1。
这感觉比应该的更难。我处于异步上下文中,感觉编译器应该知道这一点并允许我.await从方法内调用 - 唯一的代码路径从块内调用此方法async。有没有一种简单的方法可以做到我所缺少的?
在像C#这样的语言中,给出这个代码(我没有await故意使用关键字):
async Task Foo()
{
var task = LongRunningOperationAsync();
// Some other non-related operation
AnotherOperation();
result = task.Result;
}
Run Code Online (Sandbox Code Playgroud)
在第一行中,long操作在另一个线程中运行,并Task返回a(即未来).然后,您可以执行另一个与第一个并行运行的操作,最后,您可以等待操作完成.我认为,这也是行为async/ await在Python,JavaScript等
另一方面,在Rust中,我在RFC中读到:
Rust的期货与其他语言的期货之间的根本区别在于,除非进行调查,否则Rust的期货不会做任何事情.整个系统是围绕这个建立的:例如,取消正在降低未来正是出于这个原因.相比之下,在其他语言中,调用异步fn会旋转一个立即开始执行的未来.
在这种情况下,是什么目的async/ await鲁斯特?看到其他语言,这种表示法是一种运行并行操作的便捷方式,但是如果调用async函数没有运行任何东西,我无法看到它在Rust中是如何工作的.
我有一个基于Tokio的 Rust 异步服务器运行时。它必须同时处理对延迟敏感的 I/O 密集型请求和大量 CPU 密集型请求。
我不想让 CPU 密集型任务垄断 Tokio 运行时并使 I/O 密集型任务饿死,所以我想将 CPU 密集型任务卸载到专用的、隔离的线程池(隔离是这里的关键,所以spawn_blocking/block_in_place在一个共享线程池上是不够的)。如何在 Tokio 中创建这样的线程池?
启动两个运行时的幼稚方法会遇到错误:
线程“tokio-runtime-worker”因“无法从运行时内启动运行时”而恐慌。发生这种情况是因为一个函数(如
block_on)试图在当前线程被用于驱动异步任务时阻塞当前线程。
use tokio; // 0.2.20
fn main() {
let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
let cpu_pool = cpu_pool.handle().clone(); // this is the fix/workaround!
main_runtime.block_on(main_runtime.spawn(async move {
cpu_pool.spawn(async {}).await
}))
.unwrap().unwrap();
}
Run Code Online (Sandbox Code Playgroud)
Tokio 可以允许两个独立的运行时吗?有没有更好的方法在 Tokio 中创建隔离的 CPU 池?
在task文档中,有一节讨论了在异步中调用阻塞代码,以及如何避免这种情况,以免过多地阻塞异步线程(https://docs.rs/tokio/1.21.2/tokio/task/ index.html#blocking-and-yielding)。
它还讨论了用于tokio::task::spawn_blocking这些任务的方法,但我想知道在什么时候建议将工作发送到不同的线程?我目前正在编写一个程序,可以恢复大量的 ECDSA 签名,每条消息大约需要 100 微秒,同时进行大量的网络 IO。作为一个具体的例子,这足以使用类似的东西吗spawn_blocking?
我正在通过r2d2使用柴油的actix-web应用程序上工作,并且不确定如何最好地进行异步查询。我发现了三个看似合理的选择,但不确定哪个是最好的。
对于我来说,我可以使用actix示例,但是它非常复杂,需要大量样板来构建。我希望有一个更合理的解决方案。
Actix_web::web::block作为另一种选择,我可以使用将actix_web::web::block查询功能包装到将来,但是我不确定这样做的性能含义。
然后,该查询是否在同一Tokio系统中运行?从我在源代码中可以找到的地方,它在基础的actix-web线程池中创建了一个线程。那是问题吗?
如果我没看错代码,则r2d2在获取连接时会阻塞其线程,这会阻塞部分核心actix-web池。与数据库查询相同。如果我执行的查询多于该池中的线程数,那么这将阻止所有actix-web?如果是这样,那就大问题了。
最后,可能会有一些不必要开销的安全选择是futures-cpupool。主要问题是,这意味着要向我的项目中添加另一个板条箱,尽管我不喜欢不必要地在应用程序中浮动多个cpu池的想法。
由于r2d2和柴油都会阻塞,因此这里有令人惊讶的棘手事情。
最重要的是,不要与不使用同一r2d2池的任何事物共享此cpupool(因为创建的所有线程可能只是阻塞等待r2d2连接,在工作存在时锁定整个池)。
其次(更明显一点),因此您不应该拥有比池中线程更多的r2d2连接,反之亦然,因为更大的r2d2连接会浪费资源(连接未使用/线程不断被阻塞)(也许还有一个线程,也许更快)由OS调度程序而不是cpupool调度程序进行连接切换)。
最后,请注意您正在使用的数据库以及那里的性能。在写繁琐的sqlite应用程序中运行单个连接r2d2和池中的单个线程可能是最好的选择(尽管我会为此推荐一个合适的数据库)。
https://www.reddit.com/r/rust/comments/axy0hp/patterns_to_scale_actixweb_and_diesel/
本质上,建议使用Futures-cpupool。
对于一般情况,建议使用Futures-cpupool。
https://www.reddit.com/r/rust/comments/9fe1ye/noob_here_can_we_talk_about_async_and_databases/
对旧的actix-web版本的一个非常好的修复。从我可以找到的请求中,不再有CPU池。
我有一个缓慢的未来,在运行完成之前阻塞了 1 秒。
我尝试使用join组合器,但复合未来my_app按顺序执行期货:
#![feature(pin, futures_api, arbitrary_self_types)]
extern crate futures; // v0.3
use futures::prelude::*;
use futures::task::Context;
use std::pin::PinMut;
use std::{thread, time};
use futures::executor::ThreadPoolBuilder;
struct SlowComputation {}
impl Future for SlowComputation {
type Output = ();
fn poll(self: PinMut<Self>, _cx: &mut Context) -> Poll<Self::Output> {
let millis = time::Duration::from_millis(1000);
thread::sleep(millis);
Poll::Ready(())
}
}
fn main() {
let fut1 = SlowComputation {};
let fut2 = SlowComputation {};
let my_app = fut1.join(fut2);
ThreadPoolBuilder::new()
.pool_size(5)
.create()
.expect("Failed to create threadpool") …Run Code Online (Sandbox Code Playgroud) 我有一个程序可以缓慢地生成数据(我们可以说它是计算密集型的,就像计算 pi 的数字一样)。它产生大量数据;每个响应可以是 1GiB,不适合内存,并且必须按需生成。我正在使用 hyper 编写一个 Web 服务来根据请求生成内容。
让我们跳过样板(service_fn, Server::bind)。
缓慢生成数据的 API 可能类似于
use std::io;
impl SlowData {
fn new(initial: &str) -> SlowData {
unimplemented!()
}
fn next_block(&self) -> io::Result<&[u8]> {
unimplemented!()
}
}
type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;
fn run(req: Request) -> ResponseFuture {
// spawn a thread and:
// initialize the generator
// SlowData::new(&req.uri().path());
// spawn a thread and call slow.next_block() until len()==0
// each …Run Code Online (Sandbox Code Playgroud) Rust 有async可以与Abortable期货相关联的方法。文档说,中止时:
未来将立即完成,没有任何进一步的进展。
绑定到未来的任务所拥有的变量会被删除吗?如果这些变量实现了drop,会drop被调用吗?如果未来产生了其他未来,它们是否会在一个链中流产?
例如:在下面的代码片段中,我没有看到为中止的任务发生析构函数,但我不知道它是否未被调用或发生在未显示打印的单独线程中。
use futures::executor::block_on;
use futures::future::{AbortHandle, Abortable};
struct S {
i: i32,
}
impl Drop for S {
fn drop(&mut self) {
println!("dropping S");
}
}
async fn f() -> i32 {
let s = S { i: 42 };
std::thread::sleep(std::time::Duration::from_secs(2));
s.i
}
fn main() {
println!("first test...");
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let _ = Abortable::new(f(), abort_registration);
abort_handle.abort();
std::thread::sleep(std::time::Duration::from_secs(1));
println!("second test...");
let …Run Code Online (Sandbox Code Playgroud) 我试图理解Future::select:在这个例子中,首先返回具有较长时间延迟的未来.
当我通过它的例子阅读这篇文章时,我得到了认知失调.作者写道:
该
select函数运行两个(或者更多select_all)期货,并返回第一个完成.这对于实现超时很有用.
看来我不明白这种感觉select.
extern crate futures;
extern crate tokio_core;
use std::thread;
use std::time::Duration;
use futures::{Async, Future};
use tokio_core::reactor::Core;
struct Timeout {
time: u32,
}
impl Timeout {
fn new(period: u32) -> Timeout {
Timeout { time: period }
}
}
impl Future for Timeout {
type Item = u32;
type Error = String;
fn poll(&mut self) -> Result<Async<u32>, Self::Error> {
thread::sleep(Duration::from_secs(self.time as u64));
println!("Timeout is done with time {}.", …Run Code Online (Sandbox Code Playgroud) 来自Java,我习惯于遵循以下习惯用法
while (true) {
try {
someBlockingOperation();
} catch (InterruptedException e) {
Thread.currentThread.interrupt(); // re-set the interrupted flag
cleanup(); // whatever is necessary
break;
}
}
Run Code Online (Sandbox Code Playgroud)
据我所知,它可以在整个JDK中处理任何可能阻塞的内容,例如从文件读取,从套接字读取,从队列读取甚至对于Thread.sleep()。
阅读有关如何在Rust中完成此操作的信息,我发现了许多看似特殊的解决方案,例如mio,tokio。我也找到ErrorKind::Interrupted并尝试ErrorKind通过发送SIGINT到线程来解决这个问题,但是线程似乎立即死亡而没有留下任何(返回)跟踪。
这是我使用的代码(注意:Rust还不很精通,因此看起来可能有些奇怪,但是可以运行):
use std::io;
use std::io::Read;
use std::thread;
pub fn main() {
let sub_thread = thread::spawn(|| {
let mut buffer = [0; 10];
loop {
let d = io::stdin().read(&mut buffer);
println!("{:?}", d);
let n = d.unwrap();
if n == 0 { …Run Code Online (Sandbox Code Playgroud) 我的目标是同时运行 N 个函数,但在所有函数完成之前不想生成更多函数。这是我到目前为止所拥有的:
extern crate tokio;
extern crate futures;
use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;
fn main() {
let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
.for_each(|interval| {
println!("Interval: {:?}", interval);
for i in 0..5 {
tokio::spawn(lazy(move || {
println!("Hello from task {}", i);
// mock delay (something blocking)
// thread::sleep(time::Duration::from_secs(3));
Command::new("sleep").arg("3").output().expect("failed to execute process");
Ok(())
}));
}
Ok(())
})
.map_err(|e| panic!("interval errored; err={:?}", e));
tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)
我每秒生成 5 个函数,但现在我想等到所有函数完成后再生成更多函数。
根据我的理解(我的想法可能是错误的),我将Future 在另一个未来返回
extern crate tokio;
extern …Run Code Online (Sandbox Code Playgroud) 我正在学习 Rust,特别是并行的多线程和异步请求。
我阅读了文档,但仍然不明白我在哪里犯了错误。我想我知道在哪里,但不知道如何解决它。
主程序.rs
use std::thread;
struct Request {
url: String,
}
impl Request {
fn new(name: &str) -> Request {
Request {
url: name.to_string(),
}
}
async fn call(&self, x: &str) -> Result<(), Box<dyn std::error::Error>> {
let resp = reqwest::get(x).await;
Ok(())
}
}
#[tokio::main]
async fn main() {
let requests = vec![
Request::new("https://www.google.com/"),
Request::new("https://www.google.com/"),
];
let handles: Vec<_> = requests
.into_iter()
.map(|request| {
thread::spawn(move || async {
request.call(&request.url).await;
})
})
.collect();
for y in handles {
println!("{:?}", y);
} …Run Code Online (Sandbox Code Playgroud) rust ×12
rust-tokio ×6
future ×5
async-await ×4
asynchronous ×2
actix-web ×1
hyper ×1
reqwest ×1
rust-diesel ×1
select ×1
syntax ×1
terminate ×1
threadpool ×1