我的目标是同时运行 N 个函数,但在所有函数完成之前不想生成更多函数。这是我到目前为止所拥有的:
extern crate tokio;
extern crate futures;
use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;
fn main() {
let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
.for_each(|interval| {
println!("Interval: {:?}", interval);
for i in 0..5 {
tokio::spawn(lazy(move || {
println!("Hello from task {}", i);
// mock delay (something blocking)
// thread::sleep(time::Duration::from_secs(3));
Command::new("sleep").arg("3").output().expect("failed to execute process");
Ok(())
}));
}
Ok(())
})
.map_err(|e| panic!("interval errored; err={:?}", e));
tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)
我每秒生成 5 个函数,但现在我想等到所有函数完成后再生成更多函数。
根据我的理解(我的想法可能是错误的),我将Future 在另一个未来返回
extern crate tokio;
extern crate futures;
use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;
fn main() {
let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
.for_each(|interval| {
println!("Interval: {:?}", interval);
for i in 0..5 {
tokio::spawn(lazy(move || {
println!("Hello from task {}", i);
// mock delay (something blocking)
// thread::sleep(time::Duration::from_secs(3));
Command::new("sleep").arg("3").output().expect("failed to execute process");
Ok(())
}));
}
Ok(())
})
.map_err(|e| panic!("interval errored; err={:?}", e));
tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)
我陷入困境,试图等待内在的未来结束。
您可以通过加入您的工人未来来实现这一点,使它们全部并行运行,但必须全部一起完成。然后,出于同样的原因,您可以延迟 1 秒加入该操作。将其包装到一个循环中以永远运行它(对于演示,则迭代 5 次)。
use futures::{future, future::BoxFuture, stream, FutureExt, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0
#[tokio::main]
async fn main() {
let now = Instant::now();
let forever = stream::unfold((), |()| async {
eprintln!("Loop starting at {:?}", Instant::now());
// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());
// Resolves when both all pages and a delay of 1 second is done
future::join(batch_of_pages, time::sleep(Duration::from_secs(1))).await;
Some(((), ()))
});
forever.take(5).for_each(|_| async {}).await;
eprintln!("Took {:?}", now.elapsed());
}
fn all_pages() -> Vec<BoxFuture<'static, ()>> {
vec![page("a", 100).boxed(), page("b", 200).boxed()]
}
async fn page(name: &'static str, time_ms: u64) {
eprintln!("page {} starting", name);
time::sleep(Duration::from_millis(time_ms)).await;
eprintln!("page {} done", name);
}
Run Code Online (Sandbox Code Playgroud)
Loop starting at Instant { t: 1022680437923626 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022681444390534 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022682453240399 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022683469924126 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022684493522592 }
page a starting
page b starting
page a done
page b done
Took 5.057315596s
Run Code Online (Sandbox Code Playgroud)
use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay}; // 0.1.18
fn main() {
let repeat_count = Some(5);
let forever = future::loop_fn(repeat_count, |repeat_count| {
eprintln!("Loop starting at {:?}", Instant::now());
// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());
// Resolves when both all pages and a delay of 1 second is done
let wait = Future::join(batch_of_pages, ez_delay_ms(1000));
// Run all this again
wait.map(move |_| {
if let Some(0) = repeat_count {
Loop::Break(())
} else {
Loop::Continue(repeat_count.map(|c| c - 1))
}
})
});
tokio::run(forever.map_err(drop));
}
fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}
fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
future::ok(())
.inspect(move |_| eprintln!("page {} starting", name))
.and_then(move |_| ez_delay_ms(time_ms))
.inspect(move |_| eprintln!("page {} done", name))
}
fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Run Code Online (Sandbox Code Playgroud)
use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay}; // 0.1.18
fn main() {
let repeat_count = Some(5);
let forever = future::loop_fn(repeat_count, |repeat_count| {
eprintln!("Loop starting at {:?}", Instant::now());
// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());
// Resolves when both all pages and a delay of 1 second is done
let wait = Future::join(batch_of_pages, ez_delay_ms(1000));
// Run all this again
wait.map(move |_| {
if let Some(0) = repeat_count {
Loop::Break(())
} else {
Loop::Continue(repeat_count.map(|c| c - 1))
}
})
});
tokio::run(forever.map_err(drop));
}
fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}
fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
future::ok(())
.inspect(move |_| eprintln!("page {} starting", name))
.and_then(move |_| ez_delay_ms(time_ms))
.inspect(move |_| eprintln!("page {} done", name))
}
fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Run Code Online (Sandbox Code Playgroud)
也可以看看:
| 归档时间: |
|
| 查看次数: |
4010 次 |
| 最近记录: |