使用 Spring Integration DSL,如何配置不同集成之间的 I/O?

Pic*_*les 2 java spring-integration spring-boot spring-integration-dsl

我有一个 Spring Boot 应用程序,我正在尝试将两个不同的 Spring Integration 服务连接在一起。我有一个 WebSocket 侦听器和一个使用这两个集成的可用 DSL 示例定义的 TCP 侦听器。

但是,我一直无法找到有关如何将它们连接在一起的文档。换句话说,当浏览器客户端打开 WebSocket 连接,并且从另一个系统接受传入的 TCP 连接时,我只想以双向能力盲目地将所有输入和输出从一个系统传递到另一个系统。将其视为两个协议之间的简单 telnet 桥,使用我的 Spring Boot 应用程序作为桥。

使用 Spring Integration DSL 似乎这种直接链接应该非常简单,但我什至找不到桥接多个连接的示例,即使使用注释接口也是如此。每当我从任一界面发送消息时,我都会看到“调度程序没有订阅者”错误。

我是否过于复杂化了,或者我需要编写一些业务逻辑代码来进行双向转发?

Gar*_*ell 5

这是一个简单的 TCP<->TCP 桥...

@SpringBootApplication
public class So66993561Application {

    public static void main(String[] args) {
        SpringApplication.run(So66993561Application.class, args);
    }

    @Bean
    AbstractServerConnectionFactory server() {
        return Tcp.netServer(1234)
                .deserializer(TcpCodecs.lf())
                .serializer(TcpCodecs.lf())
                .get();
    }

    @Bean
    AbstractClientConnectionFactory client() {
        return Tcp.netClient("localhost", 1235)
                .deserializer(TcpCodecs.lf())
                .serializer(TcpCodecs.lf())
                .get();
    }

    @Bean
    IntegrationFlow serverInbound(AbstractServerConnectionFactory server) {
        return IntegrationFlows.from(Tcp.inboundAdapter(server))
                .transform(Transformers.objectToString())
                .handle("service", "sendToNCL")
                .get();
    }

    @Bean
    IntegrationFlow serverOutbound(AbstractServerConnectionFactory server) {
        return IntegrationFlows.from(ServerGateway.class)
                .handle(Tcp.outboundAdapter(server))
                .get();
    }

    @Bean
    IntegrationFlow clientInbound(AbstractClientConnectionFactory client) {
        return IntegrationFlows.from(Tcp.inboundAdapter(client))
                .transform(Transformers.objectToString())
                .handle("service", "broadcastToClients")
                .get();
    }

    @Bean
    IntegrationFlow clientOutbound(AbstractClientConnectionFactory client) {
        return IntegrationFlows.from(ClientGateway.class)
                .handle(Tcp.outboundAdapter(client))
                .get();
    }

    @Bean
    @DependsOn("clientOutbound")
    public ApplicationRunner runner(AbstractClientConnectionFactory client) {
        return args -> {
            System.out.println("Hit enter when netcat is running");
            System.in.read(); // nc -l 1235
            client.getConnection(); // just open the connection.
        };
    }

}

interface ServerGateway {

    @Gateway(payloadExpression = "#args[0]",
            headers = @GatewayHeader(name = IpHeaders.CONNECTION_ID, expression = "#args[1]" ))
    void send(String out, String connectionId);

}

interface ClientGateway {

    void send(String out);

}

@Component
@DependsOn({ "clientOutbound", "serverOutbound" })
class Service {

    private final Set<String> serverClients = ConcurrentHashMap.newKeySet();

    private final ClientGateway client;

    private final ServerGateway server;

    private volatile boolean clientOpen;

    public Service(ClientGateway client, ServerGateway server) {
        this.client = client;
        this.server = server;
    }

    public void broadcastToClients(String in) {
        System.out.println("received from server: " + in);
        this.serverClients.forEach(client -> {
            try {
                this.server.send(in, client);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    public void sendToNCL(String in) {
        System.out.println("received from a client: " + in);
        if (this.clientOpen) {
            try {
                this.client.send(in);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @EventListener
    public void opens(TcpConnectionOpenEvent event) {
        System.out.println(event);
        if (event.getConnectionFactoryName().equals("server")) {
            this.serverClients.add(event.getConnectionId());
        }
        else {
            this.clientOpen = true;
        }
    }

    @EventListener
    public void closes(TcpConnectionCloseEvent event) {
        System.out.println(event);
        this.serverClients.remove(event.getConnectionId());
        if (event.getConnectionFactoryName().equals("client")) {
            this.clientOpen = false;
        }
    }

}
Run Code Online (Sandbox Code Playgroud)
$ nc -l 1235
foo <<<<<<<<<<<<<<<<< received
bar <<<<<<<<<<<<<<<<< sent
Run Code Online (Sandbox Code Playgroud)
$ nc localhost 1234
foo <<<<<<<<<<<<<<<<< sent
bar <<<<<<<<<<<<<<<<< received
Run Code Online (Sandbox Code Playgroud)

编辑

添加 websocket 服务器...

$ nc -l 1235
foo <<<<<<<<<<<<<<<<< received
bar <<<<<<<<<<<<<<<<< sent
Run Code Online (Sandbox Code Playgroud)
$ websocat ws://localhost:8080/relay/websocket
foo <<<<<<<<<<<<<<<< received from nc -l connection
baz <<<<<<<<<<<<<<<< sent to nc -l connection
Run Code Online (Sandbox Code Playgroud)