如何使用 Tokio 同时运行一组函数,而无需同时运行相同的函数?

nba*_*ari 4 rust rust-tokio

我的目标是同时运行 N 个函数,但在所有函数完成之前不想生成更多函数。这是我到目前为止所拥有的

extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("Hello from task {}", i);
                    // mock delay (something blocking)
                    // thread::sleep(time::Duration::from_secs(3));
                    Command::new("sleep").arg("3").output().expect("failed to execute process");

                    Ok(())
                }));
            }
            Ok(())
        })
    .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)

我每秒生成 5 个函数,但现在我想等到所有函数完成后再生成更多函数。

根据我的理解(我的想法可能是错误的),我将Future 在另一个未来返回

extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("Hello from task {}", i);
                    // mock delay (something blocking)
                    // thread::sleep(time::Duration::from_secs(3));
                    Command::new("sleep").arg("3").output().expect("failed to execute process");

                    Ok(())
                }));
            }
            Ok(())
        })
    .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)

我陷入困境,试图等待内在的未来结束。

She*_*ter 6

您可以通过加入您的工人未来来实现这一点,使它们全部并行运行,但必须全部一起完成。然后,出于同样的原因,您可以延迟 1 秒加入该操作。将其包装到一个循环中以永远运行它(对于演示,则迭代 5 次)。

东京1.3

use futures::{future, future::BoxFuture, stream, FutureExt, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0

#[tokio::main]
async fn main() {
    let now = Instant::now();
    let forever = stream::unfold((), |()| async {
        eprintln!("Loop starting at {:?}", Instant::now());

        // Resolves when all pages are done
        let batch_of_pages = future::join_all(all_pages());

        // Resolves when both all pages and a delay of 1 second is done
        future::join(batch_of_pages, time::sleep(Duration::from_secs(1))).await;
        
        Some(((), ()))
    });

    forever.take(5).for_each(|_| async {}).await;
    eprintln!("Took {:?}", now.elapsed());
}

fn all_pages() -> Vec<BoxFuture<'static, ()>> {
    vec![page("a", 100).boxed(), page("b", 200).boxed()]
}

async fn page(name: &'static str, time_ms: u64) {
    eprintln!("page {} starting", name);
    time::sleep(Duration::from_millis(time_ms)).await;
    eprintln!("page {} done", name);
}
Run Code Online (Sandbox Code Playgroud)
Loop starting at Instant { t: 1022680437923626 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022681444390534 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022682453240399 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022683469924126 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022684493522592 }
page a starting
page b starting
page a done
page b done
Took 5.057315596s
Run Code Online (Sandbox Code Playgroud)

东京0.1

use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay};  // 0.1.18

fn main() {
    let repeat_count = Some(5);

    let forever = future::loop_fn(repeat_count, |repeat_count| {
        eprintln!("Loop starting at {:?}", Instant::now());

        // Resolves when all pages are done
        let batch_of_pages = future::join_all(all_pages());

        // Resolves when both all pages and a delay of 1 second is done
        let wait = Future::join(batch_of_pages, ez_delay_ms(1000));

        // Run all this again
        wait.map(move |_| {
            if let Some(0) = repeat_count {
                Loop::Break(())
            } else {
                Loop::Continue(repeat_count.map(|c| c - 1))
            }
        })
    });

    tokio::run(forever.map_err(drop));
}

fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
    vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}

fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    future::ok(())
        .inspect(move |_| eprintln!("page {} starting", name))
        .and_then(move |_| ez_delay_ms(time_ms))
        .inspect(move |_| eprintln!("page {} done", name))
}

fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Run Code Online (Sandbox Code Playgroud)
use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay};  // 0.1.18

fn main() {
    let repeat_count = Some(5);

    let forever = future::loop_fn(repeat_count, |repeat_count| {
        eprintln!("Loop starting at {:?}", Instant::now());

        // Resolves when all pages are done
        let batch_of_pages = future::join_all(all_pages());

        // Resolves when both all pages and a delay of 1 second is done
        let wait = Future::join(batch_of_pages, ez_delay_ms(1000));

        // Run all this again
        wait.map(move |_| {
            if let Some(0) = repeat_count {
                Loop::Break(())
            } else {
                Loop::Continue(repeat_count.map(|c| c - 1))
            }
        })
    });

    tokio::run(forever.map_err(drop));
}

fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
    vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}

fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    future::ok(())
        .inspect(move |_| eprintln!("page {} starting", name))
        .and_then(move |_| ez_delay_ms(time_ms))
        .inspect(move |_| eprintln!("page {} done", name))
}

fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Run Code Online (Sandbox Code Playgroud)

也可以看看: