PVS*_*PVS 4 java webclient reactor project-reactor
我有一个要求,我使用 Spring Batch 从 SQL DB 读取一堆行(数千),并在将它们写入 Kafka 主题之前调用 REST 服务来丰富内容。
使用 Spring Reactive webClient 时,如何限制活动非阻塞服务调用的次数?在使用 Spring Batch 读取数据后,我应该以某种方式在循环中引入 Flux 吗?
(我理解 delayElements 的用法并且它有不同的用途,因为当单个 Get Service Call 带来大量数据并且您希望服务器减慢速度时——尽管如此,我的用例有点不同,因为我有许多 WebClient 调用要进行,并希望限制调用次数以避免内存不足问题,但仍能获得非阻塞调用的优势)。
很有趣的问题。我考虑了一下,我想到了一些关于如何做到这一点的想法。我将分享我对此的想法,希望这里有一些想法可以帮助您进行调查。
不幸的是,我不熟悉 Spring Batch。然而,这听起来像是速率限制问题,或者经典的生产者-消费者问题。
所以,我们有一个生产者,它产生了太多我们的消费者跟不上的消息,中间的缓冲变得难以忍受。
我看到的问题是,正如您所描述的,您的 Spring Batch 流程不是作为流或管道工作的,而是您的反应式 Web 客户端。
因此,如果我们能够以流的形式读取数据,那么当记录开始进入管道时,这些记录将被响应式 Web 客户端处理,并且使用背压,我们可以控制来自生产者/数据库的流的流动边。
制作方
所以,我要改变的第一件事是如何从数据库中提取记录。我们需要控制当时有多少记录从数据库中读取,通过分页我们的数据检索或控制获取大小,然后在背压下控制通过反应管道向下游发送的记录数。
因此,请考虑以下(基本的)数据库数据检索,包装在Flux.
Flux<String> getData(DataSource ds) {
return Flux.create(sink -> {
try {
Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
sink.onRequest(batchSize -> {
try {
for (int i = 0; i < batchSize; i++) {
if (!rs.next()) {
//no more data, close resources!
rs.close();
stm.close();
con.close();
sink.complete();
break;
}
sink.next(rs.getString(1));
}
} catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
Run Code Online (Sandbox Code Playgroud)
在上面的例子中:
batchSize),然后等待它使用背压请求更多。sink.onCancel, sink.onDispose) ,您也可以考虑做一些事情,因为关闭连接和其他资源是这里的基础。消费者方面
在消费者端,您注册了一个订阅者,该订阅者当时仅以 1000 条的速度请求消息,并且只有在处理完该批次后才会请求更多消息。
getData(source).subscribe(new BaseSubscriber<String>() {
private int messages = 0;
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1000);
}
@Override
protected void hookOnNext(String value) {
//make http request
System.out.println(value);
messages++;
if(messages % 1000 == 0) {
//when we're done with a batch
//then we're ready to request for more
upstream().request(1000);
}
}
});
Run Code Online (Sandbox Code Playgroud)
在上面的例子中,当订阅开始时,它请求第一批 1000 条消息。在onNext我们处理第一批时,使用 Web 客户端发出 http 请求。
批次完成后,我们会向发布者请求另外一批 1000,依此类推。
你有它!使用背压可以控制当时有多少打开的 HTTP 请求。
我的示例非常初级,需要一些额外的工作才能使其做好生产准备,但我相信这有望提供一些可以适应您的 Spring Batch 场景的想法。
| 归档时间: |
|
| 查看次数: |
2602 次 |
| 最近记录: |