dav*_*ooh 23 java reactor reactive-programming project-reactor
我是新手,一般都会对Reactor和反应式编程进行预测.
我目前正在编写一段类似于此的代码:
Mono.just(userId)
.map(repo::findById)
.map(user-> {
if(user == null){
throw new UserNotFoundException();
}
return user;
})
// ... other mappings
Run Code Online (Sandbox Code Playgroud)
这个例子可能很愚蠢,实现这种情况肯定有更好的方法,但重点是:
throw new在map块中使用异常是否错误,或者我应该用return Mono.error(new UserNotFoundException())?替换它?
这两种做法有什么实际区别吗?
Ole*_*uka 42
有两种方法可以被视为一种方便的异常抛出方式:
Flux/Mono.handle操作员可以简化可能导致错误或空流的元素处理的方法之一handle.
以下代码显示了我们如何使用它来解决我们的问题:
Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})
Run Code Online (Sandbox Code Playgroud)
我们可以看到,.handle运算符需要传递BiConsumer<T, SynchronousSink<>才能处理元素.这里我们有BiConsumer中的参数.第一个是来自上游的元素,第二个SynchronousSink元素帮助我们同步向下游提供元素.这种技术扩展了提供元素处理的不同结果的能力.例如,在元素无效的情况下,我们可以向其提供错误,该错误SycnchronousSync将取消上游onError并向下游产生信号.反过来,我们可以使用相同的handle运算符"过滤" .一旦BiConsumer执行了句柄并且没有提供任何元素,Reactor会将其视为一种过滤,并将为我们请求一个额外的元素.最后,如果元素有效,我们可以简单地调用SynchronousSink#next和传播我们的元素下游或在其上应用一些映射,因此我们将handle在map此处作为运算符.此外,我们可以安全地使用该操作符,而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误.
#concatMap+Mono.error在映射期间抛出异常的一个选项是替换map为concatMap.从本质上讲,concatMap几乎一样flatMap.唯一的区别是concatMap一次只允许一个子流.这种行为大大简化了内部实现,不会影响性能.因此,我们可以使用以下代码以更实用的方式抛出异常:
Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})
Run Code Online (Sandbox Code Playgroud)
在上面的示例中,如果用户无效,我们将使用返回异常Mono.error.我们可以使用相同的助焊剂Flux.error:
Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})
Run Code Online (Sandbox Code Playgroud)
注意,在这两种情况下,我们都返回只有一个元素的冷流.在Reactor中,有一些优化可以在返回流是冷标量流的情况下提高性能.因此,建议使用流量/单声道concatMap + .just,empty,error其结果是,当我们需要更复杂的映射,这可能与最终return null或throw new ....
注意!不要在可空性上检查传入的元素.Reactor Project永远不会
null为您发送值,因为此暴力Reactive Streams规范(参见规则2.13)因此,如果repo.findById返回null,Reactor将为您抛出NPE异常.
concatMap比这更好flatMap?从本质上讲,flatMap它旨在合并来自一次执行的多个子流中的元素.这意味着flatMap应该具有异步流,因此,它们可能潜在地处理多个线程上的数据,或者可能是多个网络调用.随后,这样的期望会对实现产生很大的影响,因此flatMap应该能够处理来自多个流的数据Thread(意味着使用并发数据结构),如果从另一个流中流失,则将元素排入队列(意味着Queue每个流的s都有额外的内存分配)subream)并且不违反Reactive Streams规范规则(意味着真正复杂的实现).计算所有这些事实以及我们将普通map操作(同步)替换为更方便的抛出异常Flux/Mono.error(不改变执行的同步性)的事实导致我们不需要这样复杂的运算符我们可以使用更简单concatMap的设计用于一次异步处理单个流,并进行一些优化以处理标量冷流.
switchOnEmpty因此,当结果为空时抛出异常的另一种方法是switchOnEmpty运算符.以下代码演示了我们如何使用该方法:
Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))
Run Code Online (Sandbox Code Playgroud)
正如我们所看到的,在这种情况下repo::findById应该具有Mono的User返回类型.因此,如果User找不到实例,结果流将为空.因此,Reactor将调用Mono指定为switchIfEmpty参数的替代方法.
它可以被视为不太可读的代码或不良做法,但您可以按原样抛出异常.此模式违反了Reactive Streams规范,但reactor将捕获抛出的异常,并将其作为Subscriber信号传播到下游
Subscriber运算符以提供复杂的元素处理onNext+ onError,但这种技术最适合异步元素处理的情况..handle+concatMapMono.errorflatMap因为返回类型是被禁止的,所以不是Mono.error在你的下游,flatMap你会得到意想不到Null的nullmap如果调用某个特定函数的结果使用空流完成,则在需要发送错误信号的所有情况下使用