Jon*_*ght 6 multithreading rust
在我的应用程序中,一个方法一旦启动就运行得很快,但在接近完成时性能开始不断下降,这似乎与工作量(每个线程必须执行的函数的迭代次数)无关。一旦接近尾声,与之前相比,它的速度会减慢到令人难以置信的慢(值得注意的是,这不仅是由于剩余未完成的线程较少,似乎每个线程都变慢了)。
我无法弄清楚为什么会发生这种情况,所以我在问。我究竟做错了什么?
问题的幻灯片
值得注意的是,CPU 温度始终保持较低。
这个阶段随着设置的工作量而变化,更多的工作会产生更好的外观,所有线程始终接近 100%。不过,在这一刻,这看起来不错。
在这里我们看到了之前的持续表现,
在这里,我们看到它开始退化。我不知道为什么会发生这种情况。
经过一段时间的混乱之后,大多数线程完成了它们的工作,其余线程继续运行,此时虽然看起来它们已达到 100%,但实际上它们执行剩余工作量的速度非常缓慢。我不明白为什么会发生这种情况。
我编写了一个多线程random_search(文档链接)函数进行优化。此函数的大部分复杂性来自打印数据在线程之间传递数据,这支持提供显示进度的输出,例如:
2300
565 (24.57%) 00:00:11 / 00:00:47 [25.600657363049734] { [563.0ns, 561.3ms, 125.0ns, 110.0ns] [2.0µs, 361.8ms, 374.0ns, 405.0ns] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
Run Code Online (Sandbox Code Playgroud)
我一直在尝试使用此输出来找出出了什么问题,但我不知道。
此输出描述:
2300。565。00:00:11(mm:ss:ms)。00:00:47( mm:ss:ms)。[25.600657363049734]。update_execution_position在下面的代码中具体定义)所花费的有效时间[563.0ns, 561.3ms, 125.0ns, 110.0ns]。[2.0µs, 361.8ms, 374.0ns, 405.0ns]。0当一个线程完成时,rest 代表一个线程已经命中了某行,触发了这个设置,但还没有命中下一行改变它,实际上是在 2 个位置之间)[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]random_search代码:鉴于我已经在我的库中的其他方法测试的实施grid_search和simulated_annealing它会建议我的问题不完全ATLEAST居住在random_search.rs。
random_search.rs:
2300
565 (24.57%) 00:00:11 / 00:00:47 [25.600657363049734] { [563.0ns, 561.3ms, 125.0ns, 110.0ns] [2.0µs, 361.8ms, 374.0ns, 405.0ns] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
Run Code Online (Sandbox Code Playgroud)
util.rs:
pub fn update_execution_position<const N: usize>(
i: usize,
execution_position_timer: Instant,
thread_execution_position: &Arc<AtomicU8>,
thread_execution_times: &Arc<[Mutex<(Duration, u64)>; N]>,
) -> Instant {
{
let mut data = thread_execution_times[i - 1].lock().unwrap();
data.0 += execution_position_timer.elapsed();
data.1 += 1;
}
thread_execution_position.store(i as u8, Ordering::SeqCst);
Instant::now()
}
pub struct Polling {
pub poll_rate: u64,
pub printing: bool,
pub early_exit_minimum: Option<f64>,
pub thread_execution_reporting: bool,
}
impl Polling {
const DEFAULT_POLL_RATE: u64 = 10;
pub fn new(printing: bool, early_exit_minimum: Option<f64>) -> Self {
Self {
poll_rate: Polling::DEFAULT_POLL_RATE,
printing,
early_exit_minimum,
thread_execution_reporting: false,
}
}
}
pub fn poll<const N: usize>(
data: Polling,
// Current count of each thread.
counters: Vec<Arc<AtomicU64>>,
offset: u64,
// Final total iterations.
iterations: u64,
// Best values of each thread.
thread_bests: Vec<Arc<Mutex<f64>>>,
// Early exit switch.
thread_exit: Arc<AtomicBool>,
// Current positions of execution of each thread.
thread_execution_positions: Vec<Arc<AtomicU8>>,
// Current average times between execution positions for each thread
thread_execution_times: Vec<Arc<[Mutex<(Duration, u64)>; N]>>,
) {
let start = Instant::now();
let mut stdout = stdout();
let mut count = offset
+ counters
.iter()
.map(|c| c.load(Ordering::SeqCst))
.sum::<u64>();
if data.printing {
println!("{:20}", iterations);
}
let mut poll_time = Instant::now();
let mut held_best: f64 = f64::MAX;
let mut held_average_execution_times: [(Duration, u64); N] =
vec![(Duration::new(0, 0), 0); N].try_into().unwrap();
let mut held_recent_execution_times: [Duration; N] =
vec![Duration::new(0, 0); N].try_into().unwrap();
while count < iterations {
if data.printing {
// loop {
let percent = count as f32 / iterations as f32;
// If count == 0, give 00... for remaining time as placeholder
let remaining_time_estimate = if count == 0 {
Duration::new(0, 0)
} else {
start.elapsed().div_f32(percent)
};
print!(
"\r{:20} ({:.2}%) {} / {} [{}] {}\t",
count,
100. * percent,
print_duration(start.elapsed(), 0..3),
print_duration(remaining_time_estimate, 0..3),
if held_best == f64::MAX {
String::from("?")
} else {
format!("{}", held_best)
},
if data.thread_execution_reporting {
let (average_execution_times, recent_execution_times): (
Vec<String>,
Vec<String>,
) = (0..thread_execution_times[0].len())
.map(|i| {
let (mut sum, mut num) = (Duration::new(0, 0), 0);
for n in 0..thread_execution_times.len() {
{
let mut data = thread_execution_times[n][i].lock().unwrap();
sum += data.0;
held_average_execution_times[i].0 += data.0;
num += data.1;
held_average_execution_times[i].1 += data.1;
*data = (Duration::new(0, 0), 0);
}
}
if num > 0 {
held_recent_execution_times[i] = sum.div_f64(num as f64);
}
(
if held_average_execution_times[i].1 > 0 {
format!(
"{:.1?}",
held_average_execution_times[i]
.0
.div_f64(held_average_execution_times[i].1 as f64)
)
} else {
String::from("?")
},
if held_recent_execution_times[i] > Duration::new(0, 0) {
format!("{:.1?}", held_recent_execution_times[i])
} else {
String::from("?")
},
)
})
.unzip();
let execution_positions: Vec<u8> = thread_execution_positions
.iter()
.map(|pos| pos.load(Ordering::SeqCst))
.collect();
format!(
"{{ [{}] [{}] {:.?} }}",
recent_execution_times.join(", "),
average_execution_times.join(", "),
execution_positions
)
} else {
String::from("")
}
);
stdout.flush().unwrap();
}
// Updates best and does early exiting
match (data.early_exit_minimum, data.printing) {
(Some(early_exit), true) => {
for thread_best in thread_bests.iter() {
let thread_best_temp = *thread_best.lock().unwrap();
if thread_best_temp < held_best {
held_best = thread_best_temp;
if thread_best_temp <= early_exit {
thread_exit.store(true, Ordering::SeqCst);
println!();
return;
}
}
}
}
(None, true) => {
for thread_best in thread_bests.iter() {
let thread_best_temp = *thread_best.lock().unwrap();
if thread_best_temp < held_best {
held_best = thread_best_temp;
}
}
}
(Some(early_exit), false) => {
for thread_best in thread_bests.iter() {
if *thread_best.lock().unwrap() <= early_exit {
thread_exit.store(true, Ordering::SeqCst);
return;
}
}
}
(None, false) => {}
}
thread::sleep(saturating_sub(
Duration::from_millis(data.poll_rate),
poll_time.elapsed(),
));
poll_time = Instant::now();
count = offset
+ counters
.iter()
.map(|c| c.load(Ordering::SeqCst))
.sum::<u64>();
}
if data.printing {
println!(
"\r{:20} (100.00%) {} / {} [{}] {}\t",
count,
print_duration(start.elapsed(), 0..3),
print_duration(start.elapsed(), 0..3),
held_best,
if data.thread_execution_reporting {
let (average_execution_times, recent_execution_times): (Vec<String>, Vec<String>) =
(0..thread_execution_times[0].len())
.map(|i| {
let (mut sum, mut num) = (Duration::new(0, 0), 0);
for n in 0..thread_execution_times.len() {
{
let mut data = thread_execution_times[n][i].lock().unwrap();
sum += data.0;
held_average_execution_times[i].0 += data.0;
num += data.1;
held_average_execution_times[i].1 += data.1;
*data = (Duration::new(0, 0), 0);
}
}
if num > 0 {
held_recent_execution_times[i] = sum.div_f64(num as f64);
}
(
if held_average_execution_times[i].1 > 0 {
format!(
"{:.1?}",
held_average_execution_times[i]
.0
.div_f64(held_average_execution_times[i].1 as f64)
)
} else {
String::from("?")
},
if held_recent_execution_times[i] > Duration::new(0, 0) {
format!("{:.1?}", held_recent_execution_times[i])
} else {
String::from("?")
},
)
一些基本的调试(又名println!无处不在)表明您的性能问题根本与多线程无关。它只是随机发生,当有 24 个线程在做它们的工作时,一个随机停止的事实并不明显,但是当只剩下一两个线程时,它们会显得很慢。
但是这个性能瓶颈在哪里呢?好吧,您是在代码中自己说明的:在binary_buffer您说:
// Gets average luma in local fields
// O((s+r)^2*(n/s)*(m/s)) : s = local field size, r = local field reach
Run Code Online (Sandbox Code Playgroud)
的值s和r似乎之间的随机值0和255,而n是一个图像行的长度,以字节为单位3984 * 3 = 11952,并且m是行数2271。
现在,大多数时候O()大约是几百万,相当容易管理。但是如果s碰巧又小又r大,比如(3, 200)那么计算的次数就爆炸了1e11!
幸运的是,我认为您可以在原始调用中定义这些值的范围,random_search因此在那里进行一些调整应该会使您恢复到合理的复杂性。将范围更改为:
[0..255, 0..255, 0..255, 1..255, 20..255],
// ^ here
Run Code Online (Sandbox Code Playgroud)
似乎对我有用。
PS:开头的这些行binary_buffer是发现这一点的关键:
let o = (i_size / local_field_size) * (j_size / local_field_size) * (local_field_size + local_field_reach).pow(2);
println!("\nO() = {}", o);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
123 次 |
| 最近记录: |