Kay*_*ayV 5 java multithreading scheduler project-reactor reactive-streams
我正在研究 Flux 和 Mono,并在多线程环境中使用它们,并使用提供工作线程的 Schedular。
使用elastic、parallel 和newElastic 启动Schedular 有很多选项。
这是我使用的代码:
System.out.println("------ elastic --------- ");
Flux.range(1, 10)
.map(i -> i / 2)
.publishOn(Schedulers.elastic()).log()
.blockLast();
System.out.println("------ new elastic --------- ");
Flux.range(1, 10)
.map(i -> i / 2).log()
.publishOn(Schedulers.newElastic("my")).log()
.blockLast();
Run Code Online (Sandbox Code Playgroud)
并且他们都有相同的文档:
调度程序动态创建基于 ExecutorService 的 Workers 并缓存线程池,并在 Workers 关闭后重用它们。
创建线程池的最大数量没有限制。
未使用的线程池的默认生存时间为 60 秒,使用适当的工厂推送不同的值。
该调度程序不可重新启动。
这是他们两个的日志:
------ elastic ---------
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (elastic-2) | onNext(0)
[ INFO] (elastic-2) | onNext(1)
[ INFO] (elastic-2) | onNext(1)
[ INFO] (elastic-2) | onNext(2)
[ INFO] (elastic-2) | onNext(2)
[ INFO] (elastic-2) | onNext(3)
[ INFO] (elastic-2) | onNext(3)
[ INFO] (elastic-2) | onNext(4)
[ INFO] (elastic-2) | onNext(4)
[ INFO] (elastic-2) | onNext(5)
[ INFO] (elastic-2) | onComplete()
------ new elastic ---------
[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | request(256)
[ INFO] (main) | onNext(0)
[ INFO] (main) | onNext(1)
[ INFO] (my-4) | onNext(0)
[ INFO] (main) | onNext(1)
[ INFO] (my-4) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (my-4) | onNext(1)
[ INFO] (my-4) | onNext(2)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (my-4) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (my-4) | onNext(3)
[ INFO] (my-4) | onNext(3)
[ INFO] (main) | onNext(4)
[ INFO] (my-4) | onNext(4)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(5)
[ INFO] (my-4) | onNext(4)
[ INFO] (main) | onComplete()
[ INFO] (my-4) | onNext(5)
[ INFO] (my-4) | onComplete()
Run Code Online (Sandbox Code Playgroud)
两者有什么区别?
该elastic()函数返回一个共享调度程序实例。这意味着对此函数的多次调用将返回相同的调度程序。
以 为前缀的函数new将始终创建一个新的调度程序实例。
在此处查看该类的文档Schedulers:https://projectreactor.io/docs/core/release/api/
| 归档时间: |
|
| 查看次数: |
7508 次 |
| 最近记录: |