dpc*_*.pw 8 parallel-processing multithreading rust
我有一个MyReader实现Iterator并生成Buffers的地方Buffer : Send.很快就会MyReader生成很多东西Buffer,但我有一个CPU密集型的工作要对每个Buffer(.map(|buf| ...))这是我的瓶颈,然后收集结果(有序).我希望将CPU密集型工作并行化 - 希望能够使用工作窃取来执行它们,以及核心数量允许的速度.
编辑:更准确.我正在努力rdedup.MyStruct是Chunker哪些读取io::Read(通常是stdio),查找数据的部分(块)并产生它们.然后map()假设,对于每个块,计算它的sha256摘要,压缩,加密,保存并返回摘要作为结果map(...).已保存数据的摘要用于构建index数据.正在处理的块之间的顺序map(...)无关紧要,但是从每个块 返回的摘要map(...)需要以与找到块相同的顺序收集.实际save到文件步骤被卸载到另一个线程(写入线程).有问题的PR的实际代码
我希望我可以使用rayon它,但rayon期望一个已经可以并行的迭代器 - 例如.一个Vec<...>或类似的东西.我发现没有办法得到一个par_iter从MyReader-我的读者是非常单线程的性质.
有simple_parallel,但文件说,它不推荐用于一般用途.我想确保一切都能正常运作.
我可以采用spmc队列实现和自定义thread_pool,但我正在寻找经过优化和测试的现有解决方案.
还有pipeliner但不支持有序地图.
一般而言,就并行化而言,保留顺序是一个非常艰难的要求.
您可以尝试使用典型的扇出/扇入设置进行手工制作:
或者你可以提高抽象水平.
特别感兴趣的是:Future.
A Future表示计算的结果,其可能已经发生或可能尚未发生.接收有序列表的消费者Future可以简单地等待每个消息,并且让缓冲在队列中自然发生.
对于奖励积分,如果您使用固定大小的队列,您将自动对消费者施加压力.
因此我建议建立一些东西CpuPool.
设置将是:
use std::sync::mpsc::{Receiver, Sender};
fn produce(sender: Sender<...>) {
let pool = CpuPool::new_num_cpus();
for chunk in reader {
let future = pool.spawn_fn(|| /* do work */);
sender.send(future);
}
// Dropping the sender signals there's no more work to consumer
}
fn consume(receiver: Receiver<...>) {
while let Ok(future) = receiver.recv() {
let item = future.wait().expect("Computation Error?");
/* do something with item */
}
}
fn main() {
let (sender, receiver) = std::sync::mpsc::channel();
std::thread::spawn(move || consume(receiver));
produce(sender);
}
Run Code Online (Sandbox Code Playgroud)