无限流量和批量写入数据库

dfo*_*rsl 5 java flow-control apache-kafka project-reactor

在继续实际事件处理之前,我有一个无限的 Flux(来自使用 reactor-kafka 的 kafka)事件,我试图将这些事件批量写入数据库。我的问题是让它在适当的背压下工作。

windowTimeout并且bufferTimeout似乎是不错的候选人,因为它们允许我指定最大大小,而且还限制了在“流量”低的情况下等待的时间。

首先是windowTimeout,从中对数据库进行了批量写入。然而,这很快就出现了问题:reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...))

然后我切换到bufferTimeout,但没有成功,出现错误reactor.core.Exceptions$OverflowException:无法发出缓冲区由于缺少请求

我希望以下说明我所追求的流程:

flux.groupBy(envelope -> envelope.partition)
  .flatMap(partitionFlux -> {
    final Flux<ConsumedEnvelope> elasticFlux = partitionFlux.publishOn(Schedulers.elastic());
    final Flux<List<ConsumedEnvelope>> batchFlux = partitionFlux.bufferTimeout(100, Duration.ofMillis(500))
      .concatMap(batch -> {
        final ConsumedEnvelope last = batch.get(batch.size() - 1);

        return repository.persist(batch) // a)
          .then(last.acknowledge()) // b)
          .thenReturn(batch);
      });

    return processing(batchFlux);
  })
  .subscribe(result -> {
      // ...
  });

Run Code Online (Sandbox Code Playgroud)

a)repository.persist在内部除了迭代批处理以创建插入操作之外什么都不做,然后返回一个Mono<Void>.

b) ConsumedEnvelope.acknowledge() 用于 Kafka 偏移,我只想在成功持久化批处理后执行此操作。所有这些都包含在concatMap每个分区一次只处理一个批次中。

如上所述,这会导致溢出异常。是否有任何惯用的方法来实现我试图描述的内容?在我看来,这不应该是一项非常罕见的任务,但我是反应堆的新手,很想得到一些建议。

/d

编辑我意识到简单地添加onBackpressureBuffer实际上可以为我解决这个问题。但总的来说,有没有更好的方法来做到这一点?

编辑 2 ...由于未绑定的需求,上述内容当然会导致问题,我不知何故错过了。因此,回到最初的问题,或者可能是某种方式让 onBackpressureBuffer 不请求未绑定的需求,而只转发下游请求的内容。