如何将 ParallelIterator 转换回顺序 Iterator?

Tho*_*mas 5 rust rayon

我正在迭代数据库中的几 GB 输入项。对于每个输入项,我都会进行一些 CPU 密集型处理,从而产生一个或多个新的输出项,总计数十 GB。然后输出项存储在另一个数据库表中。

通过使用 Rayon 进行并行处理,我获得了很好的加速。然而,数据库API不是线程安全的;it'sSend但不是Sync,因此 I/O 必须序列化。

理想情况下,我只想写:

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .ser_bridge() // End parallelism. This function does not exist.
    .for_each(|output_item| {
        output_database.write_item(output_item);
    });
Run Code Online (Sandbox Code Playgroud)

基本上我想要相反的par_bridge();在调用它的线程上运行的东西,从每个线程读取项目并串行生成它们。但在Rayon目前的实现中,这似乎并不存在。我不确定这是否是因为理论上不可能,或者是否不适合当前的库设计。

输出太大,无法将其全部收集到Vec第一个中;它需要直接流式传输到数据库中。

顺便说一句,我没有和 Rayon 结婚;我和 Rayon 没有结婚。如果有另一个更合适的板条箱,我很乐意进行更换。

hel*_*low 1

我认为顺序并不重要,因此您不需要输出数据的顺序。

您可以使用 ampsc::channel将数据从for_each闭包传输到数据库 api,例如

use std::sync::mpsc;

let (tx, rx) = mpsc::channel();

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each(move |output_item| {
        tx.send(output_item).unwrap();
    });
Run Code Online (Sandbox Code Playgroud)

在第二个线程中,您可以使用该rx变量来接收数据并将其写入数据库。