多个线程如何共享迭代器?

Net*_*ave 2 multithreading rust borrowing

我一直在研究一个函数,它将使用Rust和线程将一堆文件从源复制到目标.我在线程共享迭代器时遇到了一些麻烦.我还不习惯借用系统:

extern crate libc;
extern crate num_cpus;

use libc::{c_char, size_t};
use std::thread;
use std::fs::copy;

fn python_str_array_2_str_vec<T, U, V>(_: T, _: U) -> V {
    unimplemented!()
}

#[no_mangle]
pub extern "C" fn copyFiles(
    sources: *const *const c_char,
    destinies: *const *const c_char,
    array_len: size_t,
) {
    let src: Vec<&str> = python_str_array_2_str_vec(sources, array_len);
    let dst: Vec<&str> = python_str_array_2_str_vec(destinies, array_len);
    let mut iter = src.iter().zip(dst);
    let num_threads = num_cpus::get();
    let threads = (0..num_threads).map(|_| {
        thread::spawn(|| while let Some((s, d)) = iter.next() {
            copy(s, d);
        })
    });
    for t in threads {
        t.join();
    }
}

fn main() {}
Run Code Online (Sandbox Code Playgroud)

我收到了这个我无法解决的编译错误:

error[E0597]: `src` does not live long enough
  --> src/main.rs:20:20
   |
20 |     let mut iter = src.iter().zip(dst);
   |                    ^^^ does not live long enough
...
30 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

error[E0373]: closure may outlive the current function, but it borrows `**iter`, which is owned by the current function
  --> src/main.rs:23:23
   |
23 |         thread::spawn(|| while let Some((s, d)) = iter.next() {
   |                       ^^                          ---- `**iter` is borrowed here
   |                       |
   |                       may outlive borrowed value `**iter`
   |
help: to force the closure to take ownership of `**iter` (and any other referenced variables), use the `move` keyword, as shown:
   |         thread::spawn(move || while let Some((s, d)) = iter.next() {
Run Code Online (Sandbox Code Playgroud)

我已经看到了以下问题:

当使用 我没有使用的多个线程时,值的存活时间不够长chunks,我想尝试通过线程共享迭代器,尽管创建块将它们传递给线程将是经典的解决方案.

无法在线程之间发送&str因为它没有足够长的时间 我已经看到了一些使用通道与线程通信的答案,但我不太确定使用它们.应该有一种通过线程共享一个对象的简单方法.

为什么一个局部变量没有足够长的时间用于thread :: scoped 这引起了我的注意,scoped应该修复我的错误,但是因为它在不稳定的通道中我想看看是否还有另一种方法来做它只是使用spawn.

有人可以解释我应该如何修复生命周期,以便可以从线程访问迭代器?

She*_*ter 5

这是你问题的MCVE:

use std::thread;

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];
    let mut iter = src.iter().zip(dst);
    thread::spawn(|| {
        while let Some((s, d)) = iter.next() {
            println!("{} -> {}", s, d);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

有许多相关问题:

  1. 迭代器存在于堆栈中,线程的闭包引用它.
  2. 闭包对迭代器进行了可变引用.
  3. 迭代器本身有一个Vec存在于堆栈中的引用.
  4. Vec本身引用了可能存在于堆栈中的字符串切片,但不能保证比线程更长寿.

换句话说,Rust编译器已经阻止你执行四个独立的内存不安全.

要识别的主要事情是,您生成的任何线程可能比生成它的地方寿命更长.即使您立即打电话join,编译器也无法静态验证是否会发生这种情况,因此必须采取保守的路径.这是作用域线程的重点 - 它们保证线程在它们启动的堆栈帧之前退出.

此外,您尝试在多个并发线程中使用可变引用.还有保证迭代器(或任何它是建立在迭代器)可以安全地称为并行.这是完全可能的,两个线程调用next完全相同的同一时间.这两段代码并行运行并写入相同的内存地址.一个线程写入一半数据而另一个线程写入另一半,现在您的程序将在某个任意点崩溃.

使用像crossbeam这样的工具,你的代码看起来像:

extern crate crossbeam;

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];

    let mut iter = src.iter().zip(dst);
    while let Some((s, d)) = iter.next() {
        crossbeam::scope(|scope| {
            scope.spawn(|| {
                println!("{} -> {}", s, d);
            });
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

如上所述,这将只生成一个线程,等待它完成.获得更多并行性(本练习的通常要点)的另一种方法是将调用交换为nextspawn.这需要转移所有权s,并d通过该线程move的关键字:

extern crate crossbeam;

fn main() {
    let src = vec!["one", "alpha"];
    let dst = vec!["two", "beta"];

    let mut iter = src.iter().zip(dst);
    crossbeam::scope(|scope| {
        while let Some((s, d)) = iter.next() {
            scope.spawn(move || {
                println!("{} -> {}", s, d);
            });
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

如果你在里面添加一个sleep调用spawn,你可以看到并行运行的线程.

我已经用for循环编写了它,但是:

let iter = src.iter().zip(dst);
crossbeam::scope(|scope| {
    for (s, d) in iter {
        scope.spawn(move || {
            println!("{} -> {}", s, d);
        });
    }
});
Run Code Online (Sandbox Code Playgroud)

最后,迭代器在当前线程上运行,然后从迭代器返回的每个值都传递给新线程.保证新线程在捕获的引用之前退出.

您可能对Rayon感兴趣,这是一个允许轻松并行化某些类型迭代器的板条箱.

也可以看看:


归档时间:

查看次数:

837 次

最近记录:

8 年,1 月 前