如何使用vert.x-rx创建反应性客户端-服务器TCP通信

emj*_*jay 2 java reactive-programming vert.x reactive

我目前正在从事一个项目,该项目需要外部系统和我将要编写的应用程序(使用Java)之间的TCP通信。众所周知,使用常规NIO可以轻松实现。但是,作为我正在进行的这个新项目的一部分,我必须使用Vert.x来提供TCP通信。请参考下图:

在此处输入图片说明

在右边,我的应用程序作为TCP服务器运行,等待来自外部系统的连接,在左边。我读过要创建一个TCP并监听连接,您可以简单地执行以下操作:

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening!");
  } else {
    System.out.println("Failed to bind!");
  }
});
Run Code Online (Sandbox Code Playgroud)

但是,我不知道的一点是当外部系统连接到我的应用程序并通过TCP发送EchoRequestMessages时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为EchoRequestMessage POJO,然后将EchoResponseMessage编码为字节缓冲区,然后发送回外部系统。

我如何使用vert.x-rx对接收到的EchoRequestMessage,其解码,EchoResponseMessage的编码进行反应式编程,然后将其发送回外部系统,所有这些都以一种构建器模式类型设置进行。我已经阅读了有关Observable和订阅的内容,但是我无法弄清楚要观察什么或订阅什么。任何帮助将不胜感激。

tse*_*ont 5

要从套接字读取数据,可以使用RecordParser。在套接字连接上,数据通常用换行符分隔:

RecordParser parser = RecordParser.newDelimited("\n", sock);
Run Code Online (Sandbox Code Playgroud)

A RecordParser是Vert.x,ReadStream因此可以将其转换为Flowable

FlowableHelper.toFlowable(parser)
Run Code Online (Sandbox Code Playgroud)

现在,如果EchoRequestMessage可以从创建一个Buffer

public class EchoRequestMessage {
  private String message;

  public static EchoRequestMessage fromBuffer(Buffer buffer) {
    // Deserialize
  }

  public String getMessage() {
    return message;
   }
 }
Run Code Online (Sandbox Code Playgroud)

EchoResponseMessage转换为Buffer

public class EchoResponseMessage {
  private final String message;

  public EchoResponseMessage(String message) {
    this.message = message;
  }

  public Buffer toBuffer() {
    // Serialize;
  }
}
Run Code Online (Sandbox Code Playgroud)

您可以使用RxJava运算符来实现回显服务器流:

vertx.createNetServer().connectHandler(sock -> {

  RecordParser parser = RecordParser.newDelimited("\n", sock);

  FlowableHelper.toFlowable(parser)
    .map(EchoRequestMessage::fromBuffer)
    .map(echoRequestMessage -> {
      return new EchoResponseMessage(echoRequestMessage.getMessage());
    })
    .subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
      throwable.printStackTrace();
      sock.close();
    }, sock::close);

}).listen(1234);
Run Code Online (Sandbox Code Playgroud)

[EDIT]如果协议消息中的消息不是用行分隔的,而是以长度为前缀的,则可以创建一个自定义ReadStream

class LengthPrefixedStream implements ReadStream<Buffer> {
  final RecordParser recordParser;
  boolean prefix = false;

  private LengthPrefixedStream(ReadStream<Buffer> stream) {
    recordParser = RecordParser.newFixed(4, stream);
  }

  @Override
  public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
    recordParser.exceptionHandler(handler);
    return this;
  }

  @Override
  public ReadStream<Buffer> handler(Handler<Buffer> handler) {
    if (handler == null) {
      recordParser.handler(null);
      return this;
    }
    recordParser.handler(buffer -> {
      if (prefix) {
        prefix = false;
        recordParser.fixedSizeMode(buffer.getInt(0));
      } else {
        prefix = true;
        recordParser.fixedSizeMode(4);
        handler.handle(buffer);
      }
    });
    return this;
  }

  @Override
  public ReadStream<Buffer> pause() {
    recordParser.pause();
    return this;
  }

  @Override
  public ReadStream<Buffer> resume() {
    recordParser.resume();
    return this;
  }

  @Override
  public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
    recordParser.endHandler(endHandler);
    return this;
  }
}
Run Code Online (Sandbox Code Playgroud)

并将其转换为Flowable

FlowableHelper.toFlowable(new LengthPrefixedStream(sock))
Run Code Online (Sandbox Code Playgroud)