如何使用Apache Camel Netty4以异步模式通过已建立的TCP连接发回响应?

Ric*_*iti 6 java apache-camel netty spring-boot

我正在消费者模式下使用Netty4组件(http://camel.apache.org/netty4.html)构建一个具有Apache Camel路由的微服务.所以,在我的微服务中,我正在构建的这条路由将通过TCP连接接收消息.为此,我这样做了:

@Override
public void configure() throws Exception {
 this.from("netty4:tcp://localhost:7000?textline=true&encoding=utf8")
   .process(new Processor() {
      @Override
      public void process(final Exchange exchange) throws Exception {
        log.info("[Processor] - Incoming Message -> {}", exchange.getIn().getBody(String.class));
      }
   }).to("bean:messageService");
}
Run Code Online (Sandbox Code Playgroud)

好吧,我正常收到这些消息.要测试,我使用telnet:

$ telnet localhost 7000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
TheMessage
Run Code Online (Sandbox Code Playgroud)

问题是当我想将消息发送回在该路由中建立的同一TCP通道时.在同步模式下,我可以使用Exchange对象轻松完成.但是,在异步模式下,我不知道如何将消息发送回生产者.

接收并应发送消息的Spring Service是:

@Service
public class MessageService {

  private static final Logger log = LoggerFactory.getLogger(MessageService.class);

  private List<String> messageStore = new LinkedList<>();

  public void sendToTCP(final String message) {
    log.info("[Service] - Sending Message over TCP Channel --> {}", message);
  }

  @Handler
  public void receiveFromTCP(final Exchange exchange) {
    final String messageFromTcp = exchange.getIn().getBody(String.class);
    log.info("[Service] - Message Received from TCP Channel --> {}", messageFromTcp);
    this.messageStore.add(messageFromTcp);
  }

  public List<String> getReceivedMessages() {
    return messageStore;
  }
}
Run Code Online (Sandbox Code Playgroud)

在简历中,我需要的是在这个方法中加入一些代码,类似于:

public void sendToTCP(final String message) {
  log.info("[Service] - Sending Message over TCP Channel --> {}", message);
  // Send message to producer here
  camelContext.createProducerTemplate.send....
}
Run Code Online (Sandbox Code Playgroud)

我不能创建另一条生产者的路线,因为我不知道生产者IP.我真的需要使用生成器和我的应用程序之间已经建立的TCP通道.通信需要通过TCP,其他工具(如队列)不是一种选择.


GitHub示例项目

我在GitHub上传了一个示例项目:https://github.com/rgiaviti/so-camel-netty4-tcp

这是通信TCP通信的绘制

我正在使用:

  • Spring Boot 1.5.12;
  • Apache Camel 2.21.0;

Ric*_*iti 1

根据@vikram-palakurthi 的评论找到了解决方案。

我使用了Netty4属性reuseChannel。解决方案代码与此接近:

  private static final Logger log = LoggerFactory.getLogger(MessageService.class);

  private List<String> messageStore = new LinkedList<>();
  private Channel openedChannel;

  public void sendToTCP(final String message) {
    log.info("[Service] - Sending Message over TCP Channel --> {}", message);
    log.info("[Service] - Channel is Active? {}", this.openedChannel.isActive());
    log.info("[Service] - Channel is Open? {}", this.openedChannel.isOpen());
    log.info("[Service] - Channel is Writeble? {}", this.openedChannel.isWritable());

    this.openedChannel.write(message);
    this.openedChannel.flush();
  }

  @Handler
  public void receiveFromTCP(final Exchange exchange) {
    this.openedChannel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
    final String messageFromTcp = exchange.getIn().getBody(String.class);
    log.info("[Service] - Message Received from TCP Channel --> {}", messageFromTcp);
    this.messageStore.add(messageFromTcp);
  }
Run Code Online (Sandbox Code Playgroud)

骆驼方案

Camel Routes、处理器...的完整解决方案可以在这里找到: https: //github.com/rgiaviti/so-camel-netty4-tcp/tree/solution

然而,我们需要知道这是一个简单的解决方案。开发者仍然需要处理多个通道、由生产者关闭通道、检查通道......

顶点解决方案

我也开发了 Vertx 解决方案。 https://github.com/rgiaviti/so-camel-netty4-tcp/tree/vertx-solution