如何在Rust中的自定义单线程迭代器上并行`map(...)`?

dpc*_*.pw 8 parallel-processing multithreading rust

我有一个MyReader实现Iterator并生成Buffers的地方Buffer : Send.很快就会MyReader生成很多东西Buffer,但我有一个CPU密集型的工作要对每个Buffer(.map(|buf| ...))这是我的瓶颈,然后收集结果(有序).我希望将CPU密集型工作并行化 - 希望能够使用工作窃取来执行它们,以及核心数量允许的速度.

编辑:更准确.我正在努力rdedup.MyStructChunker哪些读取io::Read(通常是stdio),查找数据的部分(块)并产生它们.然后map()假设,对于每个块,计算它的sha256摘要,压缩,加密,保存并返回摘要作为结果map(...).已保存数据的摘要用于构建index数据.正在处理的块之间的顺序map(...)无关紧要,但是从每个块 返回的摘要map(...)需要以与找到块相同的顺序收集.实际save到文件步骤被卸载到另一个线程(写入线程).有问题的PR的实际代码

我希望我可以使用rayon它,但rayon期望一个已经可以并行的迭代器 - 例如.一个Vec<...>或类似的东西.我发现没有办法得到一个par_iterMyReader-我的读者是非常单线程的性质.

simple_parallel,但文件说,它不推荐用于一般用途.我想确保一切都能正常运作.

我可以采用spmc队列实现和自定义thread_pool,但我正在寻找经过优化和测试的现有解决方案.

还有pipeliner但不支持有序地图.

Mat*_* M. 5

一般而言,就并行化而言,保留顺序是一个非常艰难的要求.

您可以尝试使用典型的扇出/扇入设置进行手工制作:

  • 单个生产者,用顺序单调递增的ID标记输入,
  • 一个线程池,它从该生产者那里消费,然后将结果发送给最终的消费者,
  • 缓冲和重新排序结果的消费者,以便按顺序处理它们.

或者你可以提高抽象水平.


特别感兴趣的是:Future.

A Future表示计算的结果,其可能已经发生或可能尚未发生.接收有序列表的消费者Future可以简单地等待每个消息,并且让缓冲在队列中自然发生.

对于奖励积分,如果您使用固定大小的队列,您将自动对消费者施加压力.


因此我建议建立一些东西CpuPool.

设置将是:

use std::sync::mpsc::{Receiver, Sender};

fn produce(sender: Sender<...>) {
    let pool = CpuPool::new_num_cpus();

    for chunk in reader {
        let future = pool.spawn_fn(|| /* do work */);
        sender.send(future);
    }

    // Dropping the sender signals there's no more work to consumer
}

fn consume(receiver: Receiver<...>) {
    while let Ok(future) = receiver.recv() {
        let item = future.wait().expect("Computation Error?");

        /* do something with item */
    }
}

fn main() {
    let (sender, receiver) = std::sync::mpsc::channel();

    std::thread::spawn(move || consume(receiver));

    produce(sender);
}
Run Code Online (Sandbox Code Playgroud)