pet*_*erl 7 java amazon-dynamodb apache-kafka project-reactor aws-sdk-java-2.0
Project Reactor 通过定义一个Scheduler. 它还为使用CompletableFuture's 的库提供了一个桥梁Mono.fromFuture(..)。
AWS 的DyanmoDB 异步客户端执行CompletableFuture它从 API 调用返回的java.util.concurrent.Executor. 默认情况下,它创建一个Executor由它也创建的线程池支持。其结果是,即使是定义数据流Scheduler就像Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())从库中创建,而不是一个从池中一个线程中执行Schedulers.boundedElastic()。所以我们看到线程名称像sdk-async-response-0-2,而不是像boundedElastic-1.
幸运的是,该库允许我们提供我们自己的Executor,如下所示,所以我的问题是:
您如何构建一个在运行时使用流的那部分上定义
Executor的线程的线程?Scheduler
用例
我们有一个存储库类,它有一个findById方法,我们需要调用者能够控制在哪个Scheduler上运行,因为它在这些截然不同的上下文中使用:
Schedulers.boundedElastic()调度程序上运行的 API 响应。尝试
我们已经尝试定义一个ExecutorusingSchedulers.immediate()和Runnable::run,如下所示,但两者都导致在 Netty 事件循环线程(示例名称:)上执行aws-java-sdk-NettyEventLoop-0-2,而不是来自定义的Scheduler.
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
runnable -> Schedulers.immediate().schedule(runnable)
))
.build();
Run Code Online (Sandbox Code Playgroud)
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
Runnable::run
))
.build();
Run Code Online (Sandbox Code Playgroud)
Ole*_*uka 18
调查这个问题,我看到在特定线程上执行后需要观察元素。准确地说,在这种情况下观察意味着*能够在某个特定线程上处理流中的值。在 RxJava 中,我们有一个正确的操作符,就像这样,但在 Project Reactor 中,我们将相同的操作称为publishOn。
因此,
*如果你要处理的数据*上Schedulers.boundedElastic(),那么你应该使用下列建设
Mono.fromFuture(..)
.publishOn(Schedulers.boundedElastic())
Run Code Online (Sandbox Code Playgroud)
.subscribeOn也有效???阅读之前的构造,您可能会开始担心,因为您 100% 确定
Mono.fromRunnable(..)
.subscribeOn(Schedulers.boundedElastic())
Run Code Online (Sandbox Code Playgroud)
onNext在线程上发送boundedElastic-1,所以有什么问题一样fromFuture。
这里有一个技巧:
subscribeOnwith Futures/CompletableFuture或任何可以在下面使用自己的异步机制的东西如果我们看看后面发生subscribeOn了什么,你会发现如下内容:
// Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Scheduler scheduler;
Publisher<T> parent;
scheduler.schedule(() -> parent.subscribe(actual));
}
Run Code Online (Sandbox Code Playgroud)
这基本上意味着subscribe将在单独的线程上调用parent 的方法。
这种技术适用于fromRunnable, fromSupplier,fromCallable因为它们的逻辑发生在subscribe方法中:
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(sds);
// skiped some parts
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
}
Run Code Online (Sandbox Code Playgroud)
这意味着它几乎等于
scheduler.schedule(() -> {
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
})
Run Code Online (Sandbox Code Playgroud)
相比之下,fromFuture工作起来要棘手得多。一个简短的测验。
我们可以在哪个线程上观察到一个值?(假设在线程 Main 上执行,并且任务在 ForkJoinPool 上执行)
var future = CompletableFuture
.supplyAsync(() -> {
return value;
})
... // some code here, does not metter just code
future.thenAccept(value -> {
System.out.println(Thread.currentThread())
});
Run Code Online (Sandbox Code Playgroud)
和正确答案......
它可能是 Thread Main
也可能是 ForkJoinPool 中的 Thread
......
因为它是活泼的......而且在这一点上,我们消耗了价值,价值可能已经交付,所以我们只是volatile在读取器线程(线程 Main)上读取字段),否则,线程 Main 将设置一个,acceptor以便稍后在ForkJoinPool线程上调用接受器。
是的,这就是为什么当您使用fromFuturewith 时subscribeOn,不能保证subscribeOn线程会观察给定的值CompletableFuture。
这就是为什么publishOn是确保在所需线程上进行值处理的唯一方法。
publishOn下去吗???是和否。这取决于。
如果您使用Mono- 在 99% 的情况下,publishOn如果您想确保您的数据处理发生在特定线程上,您可以使用- 始终使用publishOn.
不要担心潜在的开销,即使您不小心使用了 Project Reactor,它也会照顾您。Project Reactor 有几个优化可以在运行时替换你publishOn的subscribeOn(如果它是安全的而不破坏行为),所以你会得到最好的。
Scheduelrs的兔子洞Schedulers.immediate()它几乎是无操作调度程序,基本上可以
Schedulers.immediate().scheduler(runnable) {
runnable.run()
}
Run Code Online (Sandbox Code Playgroud)
是的,它对反应堆用户没有任何用处,我们仅将其用于内部需求。
有两种选择:
1.a) 创建您的有界Executor. (例如Executors.fixed...)
1.b)ScheduledExecutorService如果你想获得周期性任务和延迟任务的力量,创建你的有界
2)Scheduler使用Schedulers.fromExecutorXXXAPI从你的执行者创建一个
3)Executor在命令式世界中使用你的有界,使用你的Scheduler它是反应世界的界一个
即将推出...
即将推出