Le *_*rin 1 parallel-processing multithreading rust
我正在做一个练习代码练习,其中您必须并行计算一段字符串中的字符。该代码附带了一些基准来比较并行与顺序性能。
\n\n我添加了一个不使用任何线程库(横梁、人造丝等)的约束,只是香草锈。
\n\n到目前为止我已经想出了这个:
\n\n#![feature(test)]\nextern crate test;\n\npub mod parallel_letter_frequency {\n use std::collections::HashMap;\n use std::thread;\n\n const MIN_CHUNCK_SIZE: usize = 15;\n\n pub fn string_len<T: AsRef<str>>(strings: &[T]) -> HashMap<char, usize> {\n let mut dic = HashMap::new();\n for string in strings {\n for c in string\n .as_ref()\n .to_lowercase()\n .chars()\n .filter(|c| c.is_alphabetic())\n {\n *dic.entry(c).or_insert(0) += 1;\n }\n }\n dic\n }\n\n pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {\n let chunk_size = input.len() / worker_count;\n\n match (worker_count, chunk_size) {\n (w, c) if w == 1 || c < MIN_CHUNCK_SIZE => string_len(input),\n _ => input\n .chunks(chunk_size)\n .map(|chunk| {\n let vals = chunk.iter().map(|s| s.to_string()).collect::<Vec<_>>();\n thread::spawn(move || -> HashMap<char, usize> { string_len(&vals) })\n })\n .map(|child| child.join().unwrap())\n .fold(HashMap::new(), |mut acc, val| {\n val.iter()\n .for_each(|(k, v)| *(acc).entry(*k).or_insert(0) += v);\n acc\n }),\n }\n }\n}\n\n#[cfg(test)]\nmod tests {\n use crate::parallel_letter_frequency;\n use std::collections::HashMap;\n use test::Bencher;\n\n #[bench]\n fn bench_tiny_parallel(b: &mut Bencher) {\n let tiny = &["a"];\n b.iter(|| parallel_letter_frequency::frequency(tiny, 3));\n }\n\n #[bench]\n fn bench_tiny_sequential(b: &mut Bencher) {\n let tiny = &["a"];\n b.iter(|| frequency(tiny));\n }\n\n #[bench]\n fn bench_small_parallel(b: &mut Bencher) {\n let texts = all_texts(1);\n b.iter(|| parallel_letter_frequency::frequency(&texts, 3));\n }\n\n #[bench]\n fn bench_small_sequential(b: &mut Bencher) {\n let texts = all_texts(1);\n b.iter(|| frequency(&texts));\n }\n\n #[bench]\n fn bench_large_parallel(b: &mut Bencher) {\n let texts = all_texts(30);\n b.iter(|| parallel_letter_frequency::frequency(&texts, 3));\n }\n\n #[bench]\n fn bench_large_sequential(b: &mut Bencher) {\n let texts = all_texts(30);\n b.iter(|| frequency(&texts));\n }\n\n /// Simple sequential char frequency. Can it be beat?\n pub fn frequency(texts: &[&str]) -> HashMap<char, usize> {\n let mut map = HashMap::new();\n\n for line in texts {\n for chr in line.chars().filter(|c| c.is_alphabetic()) {\n if let Some(c) = chr.to_lowercase().next() {\n (*map.entry(c).or_insert(0)) += 1;\n }\n }\n }\n\n map\n }\n\n fn all_texts(repeat: usize) -> Vec<&\'static str> {\n [ODE_AN_DIE_FREUDE, WILHELMUS, STAR_SPANGLED_BANNER]\n .iter()\n .cycle()\n .take(3 * repeat)\n .flat_map(|anthem| anthem.iter().cloned())\n .collect()\n }\n\n // Poem by Friedrich Schiller. The corresponding music is the European Anthem.\n pub const ODE_AN_DIE_FREUDE: [&\'static str; 8] = [\n "Freude sch\xc3\xb6ner G\xc3\xb6tterfunken",\n "Tochter aus Elysium,",\n "Wir betreten feuertrunken,",\n "Himmlische, dein Heiligtum!",\n "Deine Zauber binden wieder",\n "Was die Mode streng geteilt;",\n "Alle Menschen werden Br\xc3\xbcder,",\n "Wo dein sanfter Fl\xc3\xbcgel weilt.",\n ];\n\n // Dutch national anthem\n pub const WILHELMUS: [&\'static str; 8] = [\n "Wilhelmus van Nassouwe",\n "ben ik, van Duitsen bloed,",\n "den vaderland getrouwe",\n "blijf ik tot in den dood.",\n "Een Prinse van Oranje",\n "ben ik, vrij, onverveerd,",\n "den Koning van Hispanje",\n "heb ik altijd ge\xc3\xaberd.",\n ];\n\n // American national anthem\n pub const STAR_SPANGLED_BANNER: [&\'static str; 8] = [\n "O say can you see by the dawn\'s early light,",\n "What so proudly we hailed at the twilight\'s last gleaming,",\n "Whose broad stripes and bright stars through the perilous fight,",\n "O\'er the ramparts we watched, were so gallantly streaming?",\n "And the rockets\' red glare, the bombs bursting in air,",\n "Gave proof through the night that our flag was still there;",\n "O say does that star-spangled banner yet wave,",\n "O\'er the land of the free and the home of the brave?",\n ];\n}\nRun Code Online (Sandbox Code Playgroud)\n\n这是我的基准:
\n\ntest bench_large_parallel ... bench: 851,675 ns/iter (+/- 92,416) \ntest bench_large_sequential ... bench: 839,470 ns/iter (+/- 52,717) \ntest bench_small_parallel ... bench: 22,488 ns/iter (+/- 5,062) \ntest bench_small_sequential ... bench: 28,692 ns/iter (+/- 1,406) \ntest bench_tiny_parallel ... bench: 76 ns/iter (+/- 3) \ntest bench_tiny_sequential ... bench: 66 ns/iter (+/- 3) \nRun Code Online (Sandbox Code Playgroud)\n\n正如您所看到的,顺序和并行的性能非常相似......
\n\n我必须在将每个块传递给线程之前复制它,所以我没有\'static生命周期问题,这肯定会影响性能,我该如何解决这个问题?
我尝试通过计算主线程上的小输入来解决生成时间开销,这是好的做法吗?
\n\n还有什么影响这里的性能?我究竟做错了什么?
\n这不是一个完整或特别好的答案,而只是我注意到的一些随机的事情。也许它对你有帮助:
\n\n由于迭代器的工作量较高,您总是只是创建一个线程并立即再次加入它。请尝试执行此代码(Playground):
\n\n(0..3)\n .map(|i| { println!("spawning {}", i); i })\n .map(|i| { println!("joining {}", i); i })\n .last(); \nRun Code Online (Sandbox Code Playgroud)\n\n输出可能会让您感到惊讶。您可以在这里阅读一些有关迭代器惰性的内容,但不幸的是,它并没有解释这种特殊情况。
您的并行和顺序基准测试不会执行相同的操作。平行线:
\n\nfor c in string\n .as_ref()\n .to_lowercase()\n .chars()\n .filter(|c| c.is_alphabetic())\n{\n *dic.entry(c).or_insert(0) += 1;\n}\nRun Code Online (Sandbox Code Playgroud)\n\n顺序:
\n\nfor chr in line.chars().filter(|c| c.is_alphabetic()) {\n if let Some(c) = chr.to_lowercase().next() {\n (*map.entry(c).or_insert(0)) += 1;\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n特别是,并行调用to_lowercasea str,返回 a String,从而执行至少一次堆分配。这是一个很大的诺诺。此外,在连续的情况下,您仅使用“小写”字符中的第一个字符。这只是做一些完全不同的事情。这对于您的数据集可能并不重要,但想象一下席勒先生是否会使用oplusU+0308 \xe2\x97\x8c\xcc\x88 COMBINING DIAERESIS而不是U+00F6 LATIN SMALL LETTER O WITH DIAERESIS! 那你就有大麻烦了。
chunk_size为什么对and进行奇怪的检查worker_count?如果我计算正确,那么您small甚至tiny不会因为该检查而产生多个线程。因此,parallel这些基准测试的名称是一个谎言。
每次迭代 800\xc2\xb5s 并不是很多。创建线程需要时间。再加上为每个线程提供自己的数据而进行的所有堆分配,多线程的优势并不是很大。
正如你所说,大量的堆分配。大多数情况可以通过使用来避免crossbeam::scoped,但由于不允许这样做,下一个最佳选择是将所有数据放入 an 中Arc并为每个线程提供一个工作范围。因为所有内容(除了每个线程的哈希映射之外) thread)是不可变的,共享这些东西应该很容易。
可能还有更多需要改进的地方,但这就是我现在注意到的。我希望这有帮助!
\n| 归档时间: |
|
| 查看次数: |
542 次 |
| 最近记录: |