我想使用 Rust 创建一个简单的调度程序,以便在定义的时间运行多个并发函数,但如果它们尚未完成,则不要启动更多函数。
例如,如果定义的间隔为一秒,则调度程序应运行这些函数,并且如果先前的函数尚未返回,则不再启动更多函数。目标是防止多次运行相同的函数。
我用 Go 创建了一个工作示例,如下所示:
package main
import (
"fmt"
"sync"
"time"
)
func myFunc(wg *sync.WaitGroup) {
fmt.Printf("now: %+s\n", time.Now())
time.Sleep(3 * time.Second)
wg.Done()
}
func main() {
quit := make(chan bool)
t := time.NewTicker(time.Second)
go func() {
for {
select {
case <-t.C:
var wg sync.WaitGroup
for i := 0; i <= 4; i++ {
wg.Add(1)
go myFunc(&wg)
}
wg.Wait()
fmt.Printf("--- done ---\n\n")
case <-quit:
return
}
}
}()
<-time.After(time.Minute)
close(quit)
}
Run Code Online (Sandbox Code Playgroud)
NewTicker
由于我在 Rust 标准库中没有找到类似 Go 的东西,所以我使用了Tokio并想出了这个
extern crate futures;
extern crate tokio;
use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;
fn main() {
let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
.for_each(|interval| {
println!("Interval: {:?}", interval);
for i in 0..5 {
tokio::spawn(lazy(move || {
println!("I am i: {}", i);
thread::sleep(time::Duration::from_secs(3));
Ok(())
}));
}
Ok(())
})
.map_err(|e| panic!("interval errored; err={:?}", e));
tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)
我使用这种方法的问题是,任务不会等待之前的函数被调用,因此无论之前它们是否正在运行,函数都会再次启动,我在这里缺少类似 Go 的东西sync.WaitGroup
。可以使用什么来实现与工作示例中相同的结果?
仅使用标准库是否可以实现这一目标?这主要是为了学习目的,可能有一种非常简单的方法可以避免额外的复杂性。
最后,我想通过 HTTP 定期监控一些站点(仅获取返回的状态代码),但在获得所有响应之前不要再次查询所有站点。
既然你想要并发并且只会使用标准库,那么你基本上必须使用线程。
在这里,我们为调度程序循环的每次迭代的每个函数启动一个线程,允许它们并行运行。然后,我们等待所有函数完成,以防止同时运行同一函数两次。
use std::{
thread,
time::{Duration, Instant},
};
fn main() {
let scheduler = thread::spawn(|| {
let wait_time = Duration::from_millis(500);
// Make this an infinite loop
// Or some control path to exit the loop
for _ in 0..5 {
let start = Instant::now();
eprintln!("Scheduler starting at {:?}", start);
let thread_a = thread::spawn(a);
let thread_b = thread::spawn(b);
thread_a.join().expect("Thread A panicked");
thread_b.join().expect("Thread B panicked");
let runtime = start.elapsed();
if let Some(remaining) = wait_time.checked_sub(runtime) {
eprintln!(
"schedule slice has time left over; sleeping for {:?}",
remaining
);
thread::sleep(remaining);
}
}
});
scheduler.join().expect("Scheduler panicked");
}
fn a() {
eprintln!("a");
thread::sleep(Duration::from_millis(100))
}
fn b() {
eprintln!("b");
thread::sleep(Duration::from_millis(200))
}
Run Code Online (Sandbox Code Playgroud)
您还可以使用 aBarrier
来启动线程中的每个函数一次,然后在执行结束时同步所有函数:
use std::{
sync::{Arc, Barrier},
thread,
time::Duration,
};
fn main() {
let scheduler = thread::spawn(|| {
let barrier = Arc::new(Barrier::new(2));
fn with_barrier(barrier: Arc<Barrier>, f: impl Fn()) -> impl Fn() {
move || {
// Make this an infinite loop
// Or some control path to exit the loop
for _ in 0..5 {
f();
barrier.wait();
}
}
}
let thread_a = thread::spawn(with_barrier(barrier.clone(), a));
let thread_b = thread::spawn(with_barrier(barrier.clone(), b));
thread_a.join().expect("Thread A panicked");
thread_b.join().expect("Thread B panicked");
});
scheduler.join().expect("Scheduler panicked");
}
fn a() {
eprintln!("a");
thread::sleep(Duration::from_millis(100))
}
fn b() {
eprintln!("b");
thread::sleep(Duration::from_millis(200))
}
Run Code Online (Sandbox Code Playgroud)
就我个人而言,我不会使用这两种解决方案。我会找到一个箱子,其中其他人已经编写并测试了我需要的代码。
也可以看看:
归档时间: |
|
查看次数: |
15268 次 |
最近记录: |