我可以使用Spring WebFlux来实现通过Kafka请求/响应主题获取数据的REST服务吗?

Jav*_*vaD 5 spring reactive-programming apache-kafka spring-boot spring-webflux

我正在开发 REST 服务,该服务反过来会查询缓慢的遗留系统,因此响应时间将以秒为单位。我们还期望大量负载,因此我正在考虑异步/非阻塞方法,以避免数百个“servlet”线程在调用慢速系统时被阻塞。

据我所知,这可以使用新 servlet API 规范中存在的 AsyncContext 来实现。我什至开发了一个小型原型,它似乎正在工作。

另一方面,看起来我可以使用 Spring WebFlux 实现相同的目标。不幸的是,我没有找到任何使用 Mono/Flux 包装自定义“后端”调用的示例。大多数示例只是重用已经准备好的反应式连接器,例如 ReactiveCassandraOperations.java 等。

我的数据流如下:

JS 客户端 --> Spring RestController --> 向 Kafka 主题发送请求 --> 从 Kafka 回复主题读取响应 --> 返回数据给客户端

我可以将 Kafka 步骤封装到 Mono/Flux 中吗?如何做到这一点?我的 RestController 方法应该是什么样子?

这是我的简单实现,它使用 Servlet 3.1 API 实现了相同的效果

//took the idea from some Jetty examples
public class AsyncRestServlet extends HttpServlet {
...
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

    String result = (String) req.getAttribute(RESULTS_ATTR);

    if (result == null) { //data not ready yet: schedule async processing
        final AsyncContext async = req.startAsync();

        //generate some unique request ID
        String uid = "req-" + String.valueOf(req.hashCode()); 

        //share it to Kafka receive together with AsyncContext
        //when Kafka receiver will get the response it will put it in Servlet request attribute and call async.dispatch()
        //This doGet() method will be called again and it will send the response to client
        receiver.rememberKey(uid, async); 

        //send request to Kafka
        sender.send(uid, param); 

        //data is not ready yet so we are releasing Servlet thread
        return;
    }

    //return result as html response
    resp.setContentType("text/html");
    PrintWriter out = resp.getWriter();
    out.println(result);
    out.close();
}
Run Code Online (Sandbox Code Playgroud)

Chr*_*yer 0

这是一个简短的示例 - 不是您可能想到的 WebFlux 客户端,但至少它可以让您利用 Flux 和 Mono 进行异步处理,我将其解释为您问题的重点。Web 对象应该无需额外配置即可工作,但当然您需要配置 Kafka,因为 KafkaTemplate 对象无法单独工作。

    @Bean // Using org.springframework.web.reactive.function.server.RouterFunction<ServerResponse>
    public RouterFunction<ServerResponse> sendMessageToTopic(KafkaController kafkaController){
        return RouterFunctions.route(RequestPredicates.POST("/endpoint"), kafkaController::sendMessage);
    }

    @Component
    public class ResponseHandler {
        public getServerResponse() {
            return ServerResponse.ok().body(Mono.just(Status.SUCCESS), String.class);
        }
    }

    @Component 
    public class KafkaController {
        public Mono<ServerResponse> auditInvalidTransaction(ServerRequest request) {
            return request.bodyToMono(TopicMsgMap.class) 
                // your HTTP call may not return immediately without this
                .subscribeOn(Schedulers.single()) // for a single worker thread
                .flatMap(topicMsgMap -> {
                    MyKafkaPublisher.sendMessages(topicMsgMap);
                }.flatMap(responseHandler::getServerResponse);
        }
    }

    @Data // model class just to easily convert the ServerRequest (from json, for ex.)
    // + ~@constructors
    public class TopicMsgMap() {
        private Map<String, String> topicMsgMap;
    }

    @Service // Using org.springframework.kafka.core.KafkaTemplate<String, String>
    public class MyKafkaPublisher {
        @Autowired
        private KafkaTemplate<String, String> template;

        @Value("${topic1}")
        private String topic1;
        @Value("${topic2}")
        private String topic2;

        public void sendMessages(Map<String, String> topicMsgMap){
            topicMsgMap.forEach((top, msg) -> {
                if (topic.equals("topic1") kafkaTemplate.send(topic1, message);
                if (topic.equals("topic2") kafkaTemplate.send(topic2, message);
            });
        }
    }
Run Code Online (Sandbox Code Playgroud)

猜测这不是您想要的用例,但希望您发现这个一般结构很有用。