Spring Kafka 监听 Http 请求

Ror*_*son 1 java spring apache-kafka spring-kafka

我正在使用 Spring 和 Kafka,我发出如下所示的 HTTP POST 请求,并通过 Kafka 主题将一些信息发送到另一个服务。

@RequestMapping(method = RequestMethod.POST, value = "/portfolio")
public void getPortfolio(
       Authentication auth,
    @RequestBody User user
) {
    //Data Transfer Object
    UserDTO dto = user.toDTO();
    dto.setId(((AuthenticatedUser) auth.getPrincipal()).getId());

    //Sending message to Kafka topic
    sender.sendPortfolioRequest(dto);
}
Run Code Online (Sandbox Code Playgroud)

然后我想监听不同主题的响应并在 HTTP 响应中返回数据,但我被困在这里。我可以使用下面的侦听器方法来侦听响应,但不知道如何将两者放在一起。

@KafkaListener(
    topics = Topics.PORTFOLIO_RESULT,
    containerFactory = "portfolioKafkaListenerContainerFactory"
)
public void portfolioListener(UserPortfolioDTO portfolio) {
    System.out.println("Recieved Portfolio: " + portfolio.toString());
}
Run Code Online (Sandbox Code Playgroud)

PS 我是使用 HTTP 请求的新手,不知道这是否是我想要实现的目标的正确方法,或者我是否应该使用 POST 创建新资源并重定向到该资源或其他资源。

Art*_*lan 5

这是不能用 来完成的,@KafkaListener因为它是单独启动的,并且完全在自己的线程中工作。同时,您期望在 HTTP 请求线程中得到回复。

这里唯一适合您的解决方案是ConsumerFactory手动使用 Apache Kafka ConsumerConsumer因此,您发送,从工厂获取一个实例,调用它poll()被阻止直到结果,构建 HTTP 响应并关闭Consumer

  • 对于此用例,您可以使用 [`ReplyingKafkaTemplate.sendAndReceive()` 方法](https://docs.spring.io/spring-kafka/reference/html/_reference.html#replying-template);响应系统必须在回复中设置关联 ID,如果它是具有返回值的“@KafkaListener”,则会自动完成。您可以阻止回复或向 future 添加侦听器以异步回复。 (3认同)