用Reactor抛出异常的正确方法

dav*_*ooh 23 java reactor reactive-programming project-reactor

我是新手,一般都会对Reactor和反应式编程进行预测.

我目前正在编写一段类似于此的代码:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings
Run Code Online (Sandbox Code Playgroud)

这个例子可能很愚蠢,实现这种情况肯定有更好的方法,但重点是:

throw newmap块中使用异常是否错误,或者我应该用return Mono.error(new UserNotFoundException())?替换它?

这两种做法有什么实际区别吗?

Ole*_*uka 42

有两种方法可以被视为一种方便的异常抛出方式:

使用处理元素 Flux/Mono.handle

操作员可以简化可能导致错误或空流的元素处理的方法之一handle.

以下代码显示了我们如何使用它来解决我们的问题:

Mono.just(userId)
    .map(repo::findById)
    .handle((user, sink) -> {
        if(!isValid(user)){
            sink.error(new InvalidUserException());
        } else if (isSendable(user))
            sink.next(user);
        }
        else {
            //just ignore element
        }
    })
Run Code Online (Sandbox Code Playgroud)

我们可以看到,.handle运算符需要传递BiConsumer<T, SynchronousSink<>才能处理元素.这里我们有BiConsumer中的参数.第一个是来自上游的元素,第二个SynchronousSink元素帮助我们同步向下游提供元素.这种技术扩展了提供元素处理的不同结果的能力.例如,在元素无效的情况下,我们可以向其提供错误,该错误SycnchronousSync将取消上游onError并向下游产生信号.反过来,我们可以使用相同的handle运算符"过滤" .一旦BiConsumer执行了句柄并且没有提供任何元素,Reactor会将其视为一种过滤,并将为我们请求一个额外的元素.最后,如果元素有效,我们可以简单地调用SynchronousSink#next和传播我们的元素下游或在其上应用一些映射,因此我们将handlemap此处作为运算符.此外,我们可以安全地使用该操作符,而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误.

引发使用#concatMap+Mono.error

在映射期间抛出异常的一个选项是替换mapconcatMap.从本质上讲,concatMap几乎一样flatMap.唯一的区别是concatMap一次只允许一个子流.这种行为大大简化了内部实现,不会影响性能.因此,我们可以使用以下代码以更实用的方式抛出异常:

Mono.just(userId)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Mono.error(new InvalidUserException());
        }
        return Mono.just(user);
    })
Run Code Online (Sandbox Code Playgroud)

在上面的示例中,如果用户无效,我们将使用返回异常Mono.error.我们可以使用相同的助焊剂Flux.error:

Flux.just(userId1, userId2, userId3)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Flux.error(new InvalidUserException());
        }
        return Mono.just(user);
    })
Run Code Online (Sandbox Code Playgroud)

注意,在这两种情况下,我们都返回只有一个元素的冷流.在Reactor中,有一些优化可以在返回流是冷标量流的情况下提高性能.因此,建议使用流量/单声道concatMap + .just,empty,error其结果是,当我们需要更复杂的映射,这可能与最终return nullthrow new ....

注意!不要在可空性上检查传入的元素.Reactor Project永远不会null为您发送值,因为此暴力Reactive Streams规范(参见规则2.13)因此,如果repo.findById返回null,Reactor将为您抛出NPE异常.

等等,为什么concatMap比这更好flatMap

从本质上讲,flatMap它旨在合并来自一次执行的多个子流中的元素.这意味着flatMap应该具有异步流,因此,它们可能潜在地处理多个线程上的数据,或者可能是多个网络调用.随后,这样的期望会对实现产生很大的影响,因此flatMap应该能够处理来自多个流的数据Thread(意味着使用并发数据结构),如果从另一个流中流失,则将元素排入队列(意味着Queue每个流的s都有额外的内存分配)subream)并且不违反Reactive Streams规范规则(意味着真正复杂的实现).计算所有这些事实以及我们将普通map操作(同步)替换为更方便的抛出异常Flux/Mono.error(不改变执行的同步性)的事实导致我们不需要这样复杂的运算符我们可以使用更简单concatMap的设计用于一次异步处理单个流,并进行一些优化以处理标量冷流.

使用引发异常 switchOnEmpty

因此,当结果为空时抛出异常的另一种方法是switchOnEmpty运算符.以下代码演示了我们如何使用该方法:

Mono.just(userId)
    .flatMap(repo::findById)
    .switchIfEmpty(Mono.error(new UserNotFoundExeception()))
Run Code Online (Sandbox Code Playgroud)

正如我们所看到的,在这种情况下repo::findById应该具有MonoUser返回类型.因此,如果User找不到实例,结果流将为空.因此,Reactor将调用Mono指定为switchIfEmpty参数的替代方法.

按原样抛出异常

它可以被视为不太可读的代码或不良做法,但您可以按原样抛出异常.此模式违反了Reactive Streams规范,但reactor将捕获抛出的异常,并将其作为Subscriber信号传播到下游

小贴士

  1. 使用Subscriber运算符以提供复杂的元素处理
  2. 当我们需要在映射期间抛出异常时使用onNext+ onError,但这种技术最适合异步元素处理的情况.
  3. 当我们已经到位时使用.handle+concatMapMono.error
  4. flatMap因为返回类型是被禁止的,所以不是Mono.error在你的下游,flatMap你会得到意想不到Nullnull
  5. map如果调用某个特定函数的结果使用流完成,则在需要发送错误信号的所有情况下使用

  • @OlehDokuka好答案!非常清楚。非常感谢你! (4认同)
  • @JinKwon 是的。在“Flux”中,我们仍然有“concatMap”。在“Mono”中,相同的方法称为“flatMap” (3认同)
  • @OlehDokuka我相信它已被添加到较新版本之一的反应器中,但我建议在使用“.swithIfEmpty”抛出异常时使用供应商,否则会急切地创建异常,即使永远不会达到 if-empty 。 (2认同)
  • 我们还有 `concatMap()` 方法吗? (2认同)
  • @OlehDokuka 这是一个很好的答案,谢谢!但是,您应该使用“Mono”*修复*您的示例,因为那里没有“concatMap()”方法。现在很令人困惑,您的代码在复制并粘贴到 Java 编译器时不起作用。 (2认同)