emj*_*jay 2 java reactive-programming vert.x reactive
我目前正在从事一个项目,该项目需要外部系统和我将要编写的应用程序(使用Java)之间的TCP通信。众所周知,使用常规NIO可以轻松实现。但是,作为我正在进行的这个新项目的一部分,我必须使用Vert.x来提供TCP通信。请参考下图:
在右边,我的应用程序作为TCP服务器运行,等待来自外部系统的连接,在左边。我读过要创建一个TCP并监听连接,您可以简单地执行以下操作:
NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
Run Code Online (Sandbox Code Playgroud)
但是,我不知道的一点是当外部系统连接到我的应用程序并通过TCP发送EchoRequestMessages时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为EchoRequestMessage POJO,然后将EchoResponseMessage编码为字节缓冲区,然后发送回外部系统。
我如何使用vert.x-rx对接收到的EchoRequestMessage,其解码,EchoResponseMessage的编码进行反应式编程,然后将其发送回外部系统,所有这些都以一种构建器模式类型设置进行。我已经阅读了有关Observable和订阅的内容,但是我无法弄清楚要观察什么或订阅什么。任何帮助将不胜感激。
要从套接字读取数据,可以使用RecordParser。在套接字连接上,数据通常用换行符分隔:
RecordParser parser = RecordParser.newDelimited("\n", sock);
Run Code Online (Sandbox Code Playgroud)
A RecordParser是Vert.x,ReadStream因此可以将其转换为Flowable:
FlowableHelper.toFlowable(parser)
Run Code Online (Sandbox Code Playgroud)
现在,如果EchoRequestMessage可以从创建一个Buffer:
public class EchoRequestMessage {
private String message;
public static EchoRequestMessage fromBuffer(Buffer buffer) {
// Deserialize
}
public String getMessage() {
return message;
}
}
Run Code Online (Sandbox Code Playgroud)
并EchoResponseMessage转换为Buffer:
public class EchoResponseMessage {
private final String message;
public EchoResponseMessage(String message) {
this.message = message;
}
public Buffer toBuffer() {
// Serialize;
}
}
Run Code Online (Sandbox Code Playgroud)
您可以使用RxJava运算符来实现回显服务器流:
vertx.createNetServer().connectHandler(sock -> {
RecordParser parser = RecordParser.newDelimited("\n", sock);
FlowableHelper.toFlowable(parser)
.map(EchoRequestMessage::fromBuffer)
.map(echoRequestMessage -> {
return new EchoResponseMessage(echoRequestMessage.getMessage());
})
.subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
throwable.printStackTrace();
sock.close();
}, sock::close);
}).listen(1234);
Run Code Online (Sandbox Code Playgroud)
[EDIT]如果协议消息中的消息不是用行分隔的,而是以长度为前缀的,则可以创建一个自定义ReadStream:
class LengthPrefixedStream implements ReadStream<Buffer> {
final RecordParser recordParser;
boolean prefix = false;
private LengthPrefixedStream(ReadStream<Buffer> stream) {
recordParser = RecordParser.newFixed(4, stream);
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
if (handler == null) {
recordParser.handler(null);
return this;
}
recordParser.handler(buffer -> {
if (prefix) {
prefix = false;
recordParser.fixedSizeMode(buffer.getInt(0));
} else {
prefix = true;
recordParser.fixedSizeMode(4);
handler.handle(buffer);
}
});
return this;
}
@Override
public ReadStream<Buffer> pause() {
recordParser.pause();
return this;
}
@Override
public ReadStream<Buffer> resume() {
recordParser.resume();
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
recordParser.endHandler(endHandler);
return this;
}
}
Run Code Online (Sandbox Code Playgroud)
并将其转换为Flowable:
FlowableHelper.toFlowable(new LengthPrefixedStream(sock))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
851 次 |
| 最近记录: |