F4k*_*k3d 4 java tcp serversocket spring-integration spring-boot
我试图在现有的 Spring Boot 应用程序中实现一个带有 Spring 集成的 TCP 服务器套接字,但是我遇到了一个问题,这个问题让我发疯......客户端正在向服务器发送一条消息(一个字节数组),并且超时。就是这样。我没有从服务器收到任何异常。看来我提供了错误的端口或其他东西,但检查端口后,我确信它是正确的。
这是我基于注释的配置类:
import home.brew.server.socket.ServerSocketHandler;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Tcp;
@Log4j2
@Configuration
@EnableIntegration
public class TcpServerSocketConfiguration {
@Value("${socket.port}")
private int serverSocketPort;
@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
TcpServerConnectionFactorySpec connectionFactory =
Tcp.netServer(socketPort)
.deserializer(new CustomSerializerDeserializer())
.serializer(new CustomSerializerDeserializer())
.soTcpNoDelay(true);
TcpInboundGatewaySpec inboundGateway =
Tcp.inboundGateway(connectionFactory);
return IntegrationFlows
.from(inboundGateway)
.handle(serverSocketHandler::handleMessage)
.get();
}
@Bean
public ServerSocketHandler serverSocketHandler() {
return new ServerSocketHandler();
}
}
Run Code Online (Sandbox Code Playgroud)
我想在尝试发送答案之前使接收功能正常工作,所以这就是为什么要进行最小配置。
下面的类应该处理从服务器套接字接收到的消息
import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
@Log4j2
public class ServerSocketHandler {
public String handleMessage(Message<?> message, MessageHeaders messageHeaders) {
log.info(message.getPayload());
// TODO implement something useful to process the incoming message here...
return message.getPayload().toString();
}
}
Run Code Online (Sandbox Code Playgroud)
上面的处理程序方法从未被调用过一次!我已经在谷歌上搜索了一些示例实现或教程,但我还没有找到任何对我有用的东西。我已经尝试过这些网站的实现:
还有很多网站......但没有任何帮助我:-(
更新1
我已经实现了自定义序列化器/反序列化器:
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@Log4j2
@Data
public class CustomSerializerDeserializer implements Serializer<byte[]>,
Deserializer<byte[]> {
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
return inputStream.readAllBytes();
}
@Override
public void serialize(byte[] object, OutputStream outputStream) throws IOException {
outputStream.write(object);
}
}
Run Code Online (Sandbox Code Playgroud)
客户端发送消息后,将调用自定义序列化程序,但内容始终为空。我不知道为什么......序列化器需要很多时间从流中读取所有字节,最后它是空的。这个过程一直在重复,所以我想我不小心建立了一个无限循环......
更新2
我已经捕获了客户端和服务器套接字之间的通信:看起来我陷入了握手阶段,因此没有有效负载......
因此,如果有人可以帮助我解决这个问题,我将非常感激,如果您需要更多信息,请告诉我。
提前致谢!
好吧,经过几天的分析和编码,我找到了使用 spring 集成处理 TCP 套接字通信的最佳解决方案。对于遇到同样问题的其他开发人员。这是我到目前为止所做的。
这个类包含一个 - 对我来说 - 基于注释的 TCP 套接字连接配置
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.TcpOutboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.context.request.RequestContextListener;
/**
* Spring annotation based configuration
*/
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class TcpServerSocketConfiguration {
public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
@Value("${socket.port}")
private int socketPort;
/**
* Reply messages are routed to the connection only if the reply contains the ip_connectionId header
* that was inserted into the original message by the connection factory.
*/
@MessagingGateway(defaultRequestChannel = "toTcp")
public interface Gateway {
void send(String message, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}
@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}
@Bean
public MessageChannel toTcp() {
return new DirectChannel();
}
@Bean
public AbstractServerConnectionFactory serverCF() {
TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(socketPort);
serverCf.setSerializer(SERIALIZER);
serverCf.setDeserializer(SERIALIZER);
serverCf.setSoTcpNoDelay(true);
serverCf.setSoKeepAlive(true);
// serverCf.setSingleUse(true);
// final int soTimeout = 5000;
// serverCf.setSoTimeout(soTimeout);
return serverCf;
}
@Bean
public AbstractClientConnectionFactory clientCF() {
TcpNetClientConnectionFactory clientCf = new TcpNetClientConnectionFactory("localhost", socketPort);
clientCf.setSerializer(SERIALIZER);
clientCf.setDeserializer(SERIALIZER);
clientCf.setSoTcpNoDelay(true);
clientCf.setSoKeepAlive(true);
// clientCf.setSingleUse(true);
// final int soTimeout = 5000;
// clientCf.setSoTimeout(soTimeout);
return clientCf;
}
@Bean
public TcpInboundGateway tcpInGate() {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(serverCF());
inGate.setRequestChannel(fromTcp());
inGate.setReplyChannel(toTcp());
return inGate;
}
@Bean
public TcpOutboundGateway tcpOutGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(clientCF());
outGate.setReplyChannel(toTcp());
return outGate;
}
Run Code Online (Sandbox Code Playgroud)
此类包含自定义序列化器和反序列化器
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
/**
* A custom serializer for incoming and/or outcoming messages.
*/
@Log4j2
public class CustomSerializerDeserializer implements Serializer<byte[]>, Deserializer<byte[]> {
@NotNull
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
byte[] message = new byte[0];
if (inputStream.available() > 0) {
message = inputStream.readAllBytes();
}
log.debug("Deserialized message {}", new String(message, StandardCharsets.UTF_8));
return message;
}
@Override
public void serialize(@NotNull byte[] message, OutputStream outputStream) throws IOException {
log.info("Serializing {}", new String(message, StandardCharsets.UTF_8));
outputStream.write(message);
outputStream.flush();
}
}
Run Code Online (Sandbox Code Playgroud)
在以下课程中,您可以实现一些业务逻辑来处理传入的...
import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {
@ServiceActivator(inputChannel = "toTcp")
public byte[] handleMessage(byte[] msg) {
// TODO implement some buisiness logic here
return msg;
}
}
Run Code Online (Sandbox Code Playgroud)
和传出的消息。
import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {
@ServiceActivator(inputChannel = "toTcp")
public byte[] handleMessage(byte[] msg) {
// implement some business logic here
return msg;
}
}
Run Code Online (Sandbox Code Playgroud)
希望能帮助到你。;-)
| 归档时间: |
|
| 查看次数: |
19081 次 |
| 最近记录: |