如何在spring cloud stream和kafka中发送和接收相同的主题

Jim*_*imm 6 spring spring-integration spring-boot spring-cloud-stream spring-cloud-dataflow

我有一个spring-cloud-streamkafka绑定的应用程序.我想从同一个可执行文件(jar)中发送和接收来自同一主题的消息.我有我的频道定义,如下所示: - public interface ChannelDefinition { @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage(); }

@StreamListener用来接收消息.我得到各种意想不到的错误.有时,我收到

  1. 没有为每个其他消息找到unknown.message.channel的调度程序
  2. 如果我将命令行kafka订阅者附加到上述论坛主题,它将收到所有其他消息.
  3. 我的应用程序接收所有其他消息,这是来自命令行订户的独占消息集.我确保我的应用程序在特定的组名下订阅.

是否有上述用例的工作示例?

Mar*_*ici 12

这是定义可绑定通道的错误方法(因为forum两者都使用了名称).我们应该更彻底并且快速失败,但是您将输入和输出绑定到同一个通道并在应用程序中创建竞争消费者.这也解释了备用消息的另一个问题.

你应该做的是:

public interface ChannelDefinition { 

   @Input
   public MessageChannel readMessage();

   @Output
   public MessageChannel postMessage();
}
Run Code Online (Sandbox Code Playgroud)

然后使用应用程序属性将您的通道绑定到同一队列:

spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum
Run Code Online (Sandbox Code Playgroud)