我正在迭代数据库中的几 GB 输入项。对于每个输入项,我都会进行一些 CPU 密集型处理,从而产生一个或多个新的输出项,总计数十 GB。然后输出项存储在另一个数据库表中。
通过使用 Rayon 进行并行处理,我获得了很好的加速。然而,数据库API不是线程安全的;it'sSend
但不是Sync
,因此 I/O 必须序列化。
理想情况下,我只想写:
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.ser_bridge() // End parallelism. This function does not exist.
.for_each(|output_item| {
output_database.write_item(output_item);
});
Run Code Online (Sandbox Code Playgroud)
基本上我想要相反的par_bridge()
;在调用它的线程上运行的东西,从每个线程读取项目并串行生成它们。但在Rayon目前的实现中,这似乎并不存在。我不确定这是否是因为理论上不可能,或者是否不适合当前的库设计。
输出太大,无法将其全部收集到Vec
第一个中;它需要直接流式传输到数据库中。
顺便说一句,我没有和 Rayon 结婚;我和 Rayon 没有结婚。如果有另一个更合适的板条箱,我很乐意进行更换。
我认为顺序并不重要,因此您不需要输出数据的顺序。
您可以使用 ampsc::channel
将数据从for_each
闭包传输到数据库 api,例如
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each(move |output_item| {
tx.send(output_item).unwrap();
});
Run Code Online (Sandbox Code Playgroud)
在第二个线程中,您可以使用该rx
变量来接收数据并将其写入数据库。
归档时间: |
|
查看次数: |
461 次 |
最近记录: |