小编hel*_*i77的帖子

如何在 Reactor 中进行多线程文件处理

我正在尝试使用 Reactor 的 Flux 并行处理多个文件。主要工作量发生在调用中flatMap,然后 Flux 被转换和过滤。

每当我尝试订阅生成的 Flux 时,主线程都会在我收到任何值之前退出。

Flux.fromStream(Files.list(Paths.get("directory"))
    .flatMap(path -> { 
        return Flux.create(sink -> {
            try (
                RandomAccessFile file = new RandomAccessFile(new File(path), "r");
                FileChannel fileChannel = file.getChannel()
            ) {
                // Process file into tokens
                sink.next(new Token(".."));
            } catch (IOException e) {
                sink.error(e);
            } finally {
                sink.complete();
            }
        }).subscribeOn(Schedulers.boundedElastic());
    })
    .map(token -> /* Transform tokens */)
    .filter(token -> /* Filter tokens*/)
    .subscribe(token -> /* Store tokens in list */)

Run Code Online (Sandbox Code Playgroud)

我希望在我的列表中找到处理管道的输出,但程序立即退出。首先,我想知道我是否正确使用了 Flux 类,其次我将如何等待订阅调用完成?

java blocking project-reactor

2
推荐指数
1
解决办法
1081
查看次数

标签 统计

blocking ×1

java ×1

project-reactor ×1