随着时间的推移,多线程严重的性能下降:我错过了什么?

Jon*_*ght 6 multithreading rust

在我的应用程序中,一个方法一旦启动就运行得很快,但在接近完成时性能开始不断下降,这似乎与工作量(每个线程必须执行的函数的迭代次数)无关。一旦接近尾声,与之前相比,它的速度会减慢到令人难以置信的慢(值得注意的是,这不仅是由于剩余未完成的线程较少,似乎每个线程都变慢了)。

我无法弄清楚为什么会发生这种情况,所以我在问。我究竟做错了什么?


CPU使用率概览:

问题的幻灯片

值得注意的是,CPU 温度始终保持较低。

这个阶段随着设置的工作量而变化,更多的工作会产生更好的外观,所有线程始终接近 100%。不过,在这一刻,这看起来不错。

在此处输入图片说明

在这里我们看到了之前的持续表现,

在此处输入图片说明

在这里,我们看到它开始退化。我不知道为什么会发生这种情况。

在此处输入图片说明

经过一段时间的混乱之后,大多数线程完成了它们的工作,其余线程继续运行,此时虽然看起来它们已达到 100%,但实际上它们执行剩余工作量的速度非常缓慢。我不明白为什么会发生这种情况。

在此处输入图片说明


印刷进度

我编写了一个多线程random_search文档链接)函数进行优化。此函数的大部分复杂性来自打印数据在线程之间传递数据,这支持提供显示进度的输出,例如:

2300
 565 (24.57%) 00:00:11 / 00:00:47 [25.600657363049734] { [563.0ns, 561.3ms, 125.0ns, 110.0ns] [2.0µs, 361.8ms, 374.0ns, 405.0ns] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
Run Code Online (Sandbox Code Playgroud)

我一直在尝试使用此输出来找出出了什么问题,但我不知道。

此输出描述:

  • 总迭代次数2300
  • 当前迭代的总数565
  • 运行时间00:00:11mm:ss:ms)。
  • 预计剩余时间00:00:47( mm:ss:ms)。
  • 当前最佳值[25.600657363049734]
  • 最近测量的执行位置之间的时间(线程从某行到另一行(update_execution_position在下面的代码中具体定义)所花费的有效时间[563.0ns, 561.3ms, 125.0ns, 110.0ns]
  • 执行位置之间的平均时间(这是整个运行时的平均值,而不是自上次测量以来的平均值)[2.0µs, 361.8ms, 374.0ns, 405.0ns]
  • 线程的执行位置(0当一个线程完成时,rest 代表一个线程已经命中了某行,触发了这个设置,但还没有命中下一行改变它,实际上是在 2 个位置之间)[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

random_search代码:

鉴于我已经在我的库中的其他方法测试的实施grid_searchsimulated_annealing它会建议我的问题不完全ATLEAST居住在random_search.rs

random_search.rs

2300
 565 (24.57%) 00:00:11 / 00:00:47 [25.600657363049734] { [563.0ns, 561.3ms, 125.0ns, 110.0ns] [2.0µs, 361.8ms, 374.0ns, 405.0ns] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
Run Code Online (Sandbox Code Playgroud)

util.rs

pub fn update_execution_position<const N: usize>(
    i: usize,
    execution_position_timer: Instant,
    thread_execution_position: &Arc<AtomicU8>,
    thread_execution_times: &Arc<[Mutex<(Duration, u64)>; N]>,
) -> Instant {
    {
        let mut data = thread_execution_times[i - 1].lock().unwrap();
        data.0 += execution_position_timer.elapsed();
        data.1 += 1;
    }
    thread_execution_position.store(i as u8, Ordering::SeqCst);
    Instant::now()
}

pub struct Polling {
    pub poll_rate: u64,
    pub printing: bool,
    pub early_exit_minimum: Option<f64>,
    pub thread_execution_reporting: bool,
}
impl Polling {
    const DEFAULT_POLL_RATE: u64 = 10;
    pub fn new(printing: bool, early_exit_minimum: Option<f64>) -> Self {
        Self {
            poll_rate: Polling::DEFAULT_POLL_RATE,
            printing,
            early_exit_minimum,
            thread_execution_reporting: false,
        }
    }
}

pub fn poll<const N: usize>(
    data: Polling,
    // Current count of each thread.
    counters: Vec<Arc<AtomicU64>>,
    offset: u64,
    // Final total iterations.
    iterations: u64,
    // Best values of each thread.
    thread_bests: Vec<Arc<Mutex<f64>>>,
    // Early exit switch.
    thread_exit: Arc<AtomicBool>,
    // Current positions of execution of each thread.
    thread_execution_positions: Vec<Arc<AtomicU8>>,
    // Current average times between execution positions for each thread
    thread_execution_times: Vec<Arc<[Mutex<(Duration, u64)>; N]>>,
) {
    let start = Instant::now();
    let mut stdout = stdout();
    let mut count = offset
        + counters
            .iter()
            .map(|c| c.load(Ordering::SeqCst))
            .sum::<u64>();

    if data.printing {
        println!("{:20}", iterations);
    }

    let mut poll_time = Instant::now();
    let mut held_best: f64 = f64::MAX;

    let mut held_average_execution_times: [(Duration, u64); N] =
        vec![(Duration::new(0, 0), 0); N].try_into().unwrap();
    let mut held_recent_execution_times: [Duration; N] =
        vec![Duration::new(0, 0); N].try_into().unwrap();
    while count < iterations {
        if data.printing {
            // loop {
            let percent = count as f32 / iterations as f32;

            // If count == 0, give 00... for remaining time as placeholder
            let remaining_time_estimate = if count == 0 {
                Duration::new(0, 0)
            } else {
                start.elapsed().div_f32(percent)
            };
            print!(
                "\r{:20} ({:.2}%) {} / {} [{}] {}\t",
                count,
                100. * percent,
                print_duration(start.elapsed(), 0..3),
                print_duration(remaining_time_estimate, 0..3),
                if held_best == f64::MAX {
                    String::from("?")
                } else {
                    format!("{}", held_best)
                },
                if data.thread_execution_reporting {
                    let (average_execution_times, recent_execution_times): (
                        Vec<String>,
                        Vec<String>,
                    ) = (0..thread_execution_times[0].len())
                        .map(|i| {
                            let (mut sum, mut num) = (Duration::new(0, 0), 0);
                            for n in 0..thread_execution_times.len() {
                                {
                                    let mut data = thread_execution_times[n][i].lock().unwrap();
                                    sum += data.0;
                                    held_average_execution_times[i].0 += data.0;
                                    num += data.1;
                                    held_average_execution_times[i].1 += data.1;
                                    *data = (Duration::new(0, 0), 0);
                                }
                            }
                            if num > 0 {
                                held_recent_execution_times[i] = sum.div_f64(num as f64);
                            }
                            (
                                if held_average_execution_times[i].1 > 0 {
                                    format!(
                                        "{:.1?}",
                                        held_average_execution_times[i]
                                            .0
                                            .div_f64(held_average_execution_times[i].1 as f64)
                                    )
                                } else {
                                    String::from("?")
                                },
                                if held_recent_execution_times[i] > Duration::new(0, 0) {
                                    format!("{:.1?}", held_recent_execution_times[i])
                                } else {
                                    String::from("?")
                                },
                            )
                        })
                        .unzip();

                    let execution_positions: Vec<u8> = thread_execution_positions
                        .iter()
                        .map(|pos| pos.load(Ordering::SeqCst))
                        .collect();
                    format!(
                        "{{ [{}] [{}] {:.?} }}",
                        recent_execution_times.join(", "),
                        average_execution_times.join(", "),
                        execution_positions
                    )
                } else {
                    String::from("")
                }
            );
            stdout.flush().unwrap();
        }

        // Updates best and does early exiting
        match (data.early_exit_minimum, data.printing) {
            (Some(early_exit), true) => {
                for thread_best in thread_bests.iter() {
                    let thread_best_temp = *thread_best.lock().unwrap();
                    if thread_best_temp < held_best {
                        held_best = thread_best_temp;
                        if thread_best_temp <= early_exit {
                            thread_exit.store(true, Ordering::SeqCst);
                            println!();
                            return;
                        }
                    }
                }
            }
            (None, true) => {
                for thread_best in thread_bests.iter() {
                    let thread_best_temp = *thread_best.lock().unwrap();
                    if thread_best_temp < held_best {
                        held_best = thread_best_temp;
                    }
                }
            }
            (Some(early_exit), false) => {
                for thread_best in thread_bests.iter() {
                    if *thread_best.lock().unwrap() <= early_exit {
                        thread_exit.store(true, Ordering::SeqCst);
                        return;
                    }
                }
            }
            (None, false) => {}
        }

        thread::sleep(saturating_sub(
            Duration::from_millis(data.poll_rate),
            poll_time.elapsed(),
        ));
        poll_time = Instant::now();

        count = offset
            + counters
                .iter()
                .map(|c| c.load(Ordering::SeqCst))
                .sum::<u64>();
    }

    if data.printing {
        println!(
            "\r{:20} (100.00%) {} / {} [{}] {}\t",
            count,
            print_duration(start.elapsed(), 0..3),
            print_duration(start.elapsed(), 0..3),
            held_best,
            if data.thread_execution_reporting {
                let (average_execution_times, recent_execution_times): (Vec<String>, Vec<String>) =
                    (0..thread_execution_times[0].len())
                        .map(|i| {
                            let (mut sum, mut num) = (Duration::new(0, 0), 0);
                            for n in 0..thread_execution_times.len() {
                                {
                                    let mut data = thread_execution_times[n][i].lock().unwrap();
                                    sum += data.0;
                                    held_average_execution_times[i].0 += data.0;
                                    num += data.1;
                                    held_average_execution_times[i].1 += data.1;
                                    *data = (Duration::new(0, 0), 0);
                                }
                            }
                            if num > 0 {
                                held_recent_execution_times[i] = sum.div_f64(num as f64);
                            }
                            (
                                if held_average_execution_times[i].1 > 0 {
                                    format!(
                                        "{:.1?}",
                                        held_average_execution_times[i]
                                            .0
                                            .div_f64(held_average_execution_times[i].1 as f64)
                                    )
                                } else {
                                    String::from("?")
                                },
                                if held_recent_execution_times[i] > Duration::new(0, 0) {
                                    format!("{:.1?}", held_recent_execution_times[i])
                                } else {
                                    String::from("?")
                                },
                            )

rod*_*igo 6

一些基本的调试(又名println!无处不在)表明您的性能问题根本与多线程无关。它只是随机发生,当有 24 个线程在做它们的工作时,一个随机停止的事实并不明显,但是当只剩下一两个线程时,它们会显得很慢。

但是这个性能瓶颈在哪里呢?好吧,您是在代码中自己说明的:在binary_buffer您说:

    // Gets average luma in local fields
    // O((s+r)^2*(n/s)*(m/s)) : s = local field size, r = local field reach
Run Code Online (Sandbox Code Playgroud)

的值sr似乎之间的随机值0255,而n是一个图像行的长度,以字节为单位3984 * 3 = 11952,并且m是行数2271

现在,大多数时候O()大约是几百万,相当容易管理。但是如果s碰巧又小又r大,比如(3, 200)那么计算的次数就爆炸了1e11

幸运的是,我认为您可以在原始调用中定义这些值的范围,random_search因此在那里进行一些调整应该会使您恢复到合理的复杂性。将范围更改为:

[0..255, 0..255, 0..255, 1..255, 20..255],
//                               ^ here
Run Code Online (Sandbox Code Playgroud)

似乎对我有用。

PS:开头的这些行binary_buffer是发现这一点的关键:

    let o = (i_size / local_field_size) * (j_size / local_field_size) * (local_field_size + local_field_reach).pow(2);
    println!("\nO() = {}", o);
Run Code Online (Sandbox Code Playgroud)