为什么 Rust 的人造丝使用 Arc<Mutex<anyhow::Result<()>>> 需要更长的时间?

Jim*_*Jim 2 parallel-processing rust rayon

我正在尝试使用 crate 并行化我的代码rayon。该过程是读取一个文件,对其进行处理并输出处理后的文件。

我想记下每个文件的处理结果,以便我有一个Arc<Mutex<Vec<anyhow::Result<()>>>>锁定并推送anyhow::Result<()>一个文件处理结果的每个结果。

fn main() {
    let (mut files, _) = utils::get_files_from_folder(input_folder)?;

    let results = Arc::new(Mutex::new(Vec::<anyhow::Result<()>>::new()));

    files.par_iter_mut().for_each(|path| {
        
        if let Some(extension) = path.extension() {
            if extension == "txt" {
                let result = redact::redact_txt_and_write_json(path, &regex_vec, &output_folder); // processing done here
                results.lock().expect("`results` cannot be locked").push(result); // lock the mutex and push done here
            } else {
                eprintln!(
                    "{}INVALID EXTENSION: {} - Not yet implemented",
                    *RED_ERROR_STRING,
                    extension.to_string_lossy(),
                );
                std::process::exit(1);
            };
            ()
        } else {
            eprintln!("{}EXTENSION not found", *RED_ERROR_STRING);
            std::process::exit(1);
        }
    }); // end of for_each
    println!(
        "{:?}", results.as_ref()
    );
    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

我的问题是:为什么使用锁定显然比不使用锁定需要更长的时间?

带锁定:

Finished dev [unoptimized + debuginfo] target(s) in 1m 34s
Run Code Online (Sandbox Code Playgroud)

不加锁:

Finished dev [unoptimized + debuginfo] target(s) in 0.30s
Run Code Online (Sandbox Code Playgroud)

Fin*_*nis 5

您的最小示例非常复杂且不可重现,因此我重写了它以演示您遇到的问题:

use std::{
    sync::{Arc, Mutex},
    time::Instant,
};

use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};

struct SomeResult(u32);

fn do_some_processing(arg: u32) -> SomeResult {
    SomeResult(arg * 2)
}

fn main() {
    let input_data = (1..10000000).collect::<Vec<_>>();

    let t_start = Instant::now();
    let results = Arc::new(Mutex::new(Vec::<SomeResult>::new()));

    input_data.par_iter().for_each(|value| {
        let result = do_some_processing(*value);
        results
            .lock()
            .expect("`results` cannot be locked")
            .push(result); // lock the mutex and push done here
    });
    let t_end = Instant::now();

    println!("Result item count: {}", results.lock().unwrap().len());
    println!("Time taken: {} ms", (t_end - t_start).as_millis());
}
Run Code Online (Sandbox Code Playgroud)
Result item count: 9999999
Time taken: 116 ms
Run Code Online (Sandbox Code Playgroud)

AMutex并不是这里的最佳数据结构。锁定意味着通过使用实现的并行化rayon在将结果添加到 时必须再次序列化Vec,这成为主要瓶颈。只有一个线程可以push同时执行操作,所有其他线程在.lock()函数内等待一个线程完成,直到允许下一个线程。基本上,他们一直在等待对方。

rayon有针对此用例的解决方案。它的并行迭代器支持.collect(),这与将每个项目放入向量中相同,只是没有锁并且高度并行。

有了它.collect(),我可以将示例中的时间从 减少116 ms3 ms

Result item count: 9999999
Time taken: 116 ms
Run Code Online (Sandbox Code Playgroud)
Result item count: 9999999
Time taken: 3 ms
Run Code Online (Sandbox Code Playgroud)