在使用Flux(包括重试)消耗时按顺序调用非阻塞操作

Raj*_*ani 9 java reactive-programming apache-kafka project-reactor spring-webflux

因此,我的用例是在使用Project Reactor以反应式样式编程时使用Spring Webflux应用程序中的Kafka消息,并按照从Kafka收到消息的相同顺序对每个消息执行非阻塞操作.系统也应该能够自行恢复.

以下是设置为使用的代码段:

    Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
        KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
        return receiver.receive();
    });

    messages.map(this::transformToOutputFormat)
            .map(this::performAction)
            .flatMapSequential(receiverRecordMono -> receiverRecordMono)
            .doOnNext(record -> record.receiverOffset().acknowledge())
            .doOnError(error -> logger.error("Error receiving record", error))
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .subscribe();
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,我所做的是:从Kafka获取消息,将其转换为用于新目标的对象,然后将其发送到目标,然后确认偏移以将消息标记为已消耗和处理.以与从Kafka消耗的消息相同的顺序确认偏移量是至关重要的,这样我们就不会将偏移量移动到未完全处理的消息之外(包括将一些数据发送到目的地).因此我用a flatMapSequential来确保这一点.

为简单起见,我们假设该transformToOutputFormat()方法是一种身份变换.

public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
    return record;
}
Run Code Online (Sandbox Code Playgroud)

performAction()方法需要通过网络执行某些操作,例如调用HTTP REST API.因此适当的API返回Mono,这意味着需要订阅链.此外,我需要ReceiverRecord通过此方法返回,以便可以在上面的flatMapSequential()运算符中确认偏移量.因为我需要Mono订阅,我正在flatMapSequential上面使用.如果没有,我可以用一个map代替.

public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
    return Mono.just(record)
            .flatMap(receiverRecord ->
                    HttpClient.create()
                            .port(3000)
                            .get()
                            .uri("/makeCall?data=" + receiverRecord.value().getData())
                            .responseContent()
                            .aggregate()
                            .asString()
            )
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .then(Mono.just(record));
Run Code Online (Sandbox Code Playgroud)

我在这个方法中有两个相互矛盾的需求:1.订阅进行HTTP调用的链2.返回ReceiverRecord

使用flatMap()意味着我的返回类型更改为Mono.在同一个地方使用doOnNext()将保留链中的ReceiverRecord,但不允许自动订阅HttpClient响应.

我不能添加.subscribe()asString(),因为我要等到之前被确认偏移HTTP响应完全接收.

我不能使用.block()它,因为它在并行线程上运行.

因此,我需要作弊并record从方法范围返回对象.

另一件事是在内部重试performAction时切换线程.由于flatMapSequential()急切地订阅外部通量中的每个Mono,这意味着虽然可以按顺序保证对偏移的确认,但我们不能保证HTTP调用performAction将以相同的顺序执行.

所以我有两个问题.

  1. 是否有可能以record自然的方式返回而不是返回方法范围对象?
  2. 是否可以确保HTTP调用和偏移确认的执行顺序与发生这些操作的消息的顺序相同?

Raj*_*ani 11

这是我想出的解决方案。

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
    KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
    return receiver.receive();
});

messages.map(this::transformToOutputFormat)
        .delayUntil(this::performAction)
        .doOnNext(record -> record.receiverOffset().acknowledge())
        .doOnError(error -> logger.error("Error receiving record", error))
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
        .subscribe();
Run Code Online (Sandbox Code Playgroud)

我没有使用 flatMapSequential 订阅 performAction Mono 并保留序列,而是将来自 Kafka 接收器的更多消息的请求延迟到执行操作。这实现了我需要的一次一个处理。

因此,performAction 不需要返回 ReceiverRecord 的 Mono。我还将其简化为以下内容:

public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
    HttpClient.create()
        .port(3000)
        .get()
        .uri("/makeCall?data=" + receiverRecord.value().getData())
        .responseContent()
        .aggregate()
        .asString()
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}
Run Code Online (Sandbox Code Playgroud)