Jav*_*vaD 5 spring reactive-programming apache-kafka spring-boot spring-webflux
我正在开发 REST 服务,该服务反过来会查询缓慢的遗留系统,因此响应时间将以秒为单位。我们还期望大量负载,因此我正在考虑异步/非阻塞方法,以避免数百个“servlet”线程在调用慢速系统时被阻塞。
据我所知,这可以使用新 servlet API 规范中存在的 AsyncContext 来实现。我什至开发了一个小型原型,它似乎正在工作。
另一方面,看起来我可以使用 Spring WebFlux 实现相同的目标。不幸的是,我没有找到任何使用 Mono/Flux 包装自定义“后端”调用的示例。大多数示例只是重用已经准备好的反应式连接器,例如 ReactiveCassandraOperations.java 等。
我的数据流如下:
JS 客户端 --> Spring RestController --> 向 Kafka 主题发送请求 --> 从 Kafka 回复主题读取响应 --> 返回数据给客户端
我可以将 Kafka 步骤封装到 Mono/Flux 中吗?如何做到这一点?我的 RestController 方法应该是什么样子?
这是我的简单实现,它使用 Servlet 3.1 API 实现了相同的效果
//took the idea from some Jetty examples
public class AsyncRestServlet extends HttpServlet {
...
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String result = (String) req.getAttribute(RESULTS_ATTR);
if (result == null) { //data not ready yet: schedule async processing
final AsyncContext async = req.startAsync();
//generate some unique request ID
String uid = "req-" + String.valueOf(req.hashCode());
//share it to Kafka receive together with AsyncContext
//when Kafka receiver will get the response it will put it in Servlet request attribute and call async.dispatch()
//This doGet() method will be called again and it will send the response to client
receiver.rememberKey(uid, async);
//send request to Kafka
sender.send(uid, param);
//data is not ready yet so we are releasing Servlet thread
return;
}
//return result as html response
resp.setContentType("text/html");
PrintWriter out = resp.getWriter();
out.println(result);
out.close();
}
Run Code Online (Sandbox Code Playgroud)
这是一个简短的示例 - 不是您可能想到的 WebFlux 客户端,但至少它可以让您利用 Flux 和 Mono 进行异步处理,我将其解释为您问题的重点。Web 对象应该无需额外配置即可工作,但当然您需要配置 Kafka,因为 KafkaTemplate 对象无法单独工作。
@Bean // Using org.springframework.web.reactive.function.server.RouterFunction<ServerResponse>
public RouterFunction<ServerResponse> sendMessageToTopic(KafkaController kafkaController){
return RouterFunctions.route(RequestPredicates.POST("/endpoint"), kafkaController::sendMessage);
}
@Component
public class ResponseHandler {
public getServerResponse() {
return ServerResponse.ok().body(Mono.just(Status.SUCCESS), String.class);
}
}
@Component
public class KafkaController {
public Mono<ServerResponse> auditInvalidTransaction(ServerRequest request) {
return request.bodyToMono(TopicMsgMap.class)
// your HTTP call may not return immediately without this
.subscribeOn(Schedulers.single()) // for a single worker thread
.flatMap(topicMsgMap -> {
MyKafkaPublisher.sendMessages(topicMsgMap);
}.flatMap(responseHandler::getServerResponse);
}
}
@Data // model class just to easily convert the ServerRequest (from json, for ex.)
// + ~@constructors
public class TopicMsgMap() {
private Map<String, String> topicMsgMap;
}
@Service // Using org.springframework.kafka.core.KafkaTemplate<String, String>
public class MyKafkaPublisher {
@Autowired
private KafkaTemplate<String, String> template;
@Value("${topic1}")
private String topic1;
@Value("${topic2}")
private String topic2;
public void sendMessages(Map<String, String> topicMsgMap){
topicMsgMap.forEach((top, msg) -> {
if (topic.equals("topic1") kafkaTemplate.send(topic1, message);
if (topic.equals("topic2") kafkaTemplate.send(topic2, message);
});
}
}
Run Code Online (Sandbox Code Playgroud)
猜测这不是您想要的用例,但希望您发现这个一般结构很有用。
| 归档时间: |
|
| 查看次数: |
2447 次 |
| 最近记录: |