用于 SFTP 出站(带删除)的 Spring Integration DSL

Tri*_*tan 5 java spring spring-boot spring-integration-sftp

我在用着

  • Sprint 集成(文件、SFTP 等)4.3.6
  • 春季启动1.4.3
  • Spring 集成 Java DSL 1.1.4

我正在尝试设置一个 SFTP 出站适配器,该适配器允许我将文件移动到远程系统上的目录,并删除或重命名本地系统中的文件。

因此,例如,我想将文件a.txt放在本地目录中,并将其通过 SFTP 传输到目录inbound中的远程服务器。传输完成后,我想要a.txt的本地副本删除或重命名

我正在考虑几种方法。所以这是我用于测试的常用SessionFactory。

protected SessionFactory<ChannelSftp.LsEntry> buildSftpSessionFactory() {
    DefaultSftpSessionFactory sessionFactory = new DefaultSftpSessionFactory();
    sessionFactory.setHost("localhost");
    sessionFactory.setUser("user");
    sessionFactory.setAllowUnknownKeys(true);
    sessionFactory.setPassword("pass");
    CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
    return cachingSessionFactory;
}
Run Code Online (Sandbox Code Playgroud)

这是一个转换器,我必须将一些标头添加到消息中

@Override
public Message<File> transform(Message<File> source) {
    System.out.println("here is the thing : "+source);
    File file = (File)source.getPayload();
    Message<File> transformedMessage = MessageBuilder.withPayload(file)
            .copyHeaders(source.getHeaders())
            .setHeaderIfAbsent(FileHeaders.ORIGINAL_FILE, file)
            .setHeaderIfAbsent(FileHeaders.FILENAME, file.getName())
            .build();
    return transformedMessage;
}
Run Code Online (Sandbox Code Playgroud)

然后,我有一个集成流程,它使用轮询器来监视本地目录并调用它:

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/")
            )
            .get();
}
Run Code Online (Sandbox Code Playgroud)

这工作正常,但会留下本地文件。关于如何在上传完成后删除本地文件有什么想法吗?我应该看一下SftpOutboundGateway吗?

提前致谢!

Artem 的回答非常有效!这是一个在推送本地文件后删除本地文件的简单示例。

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/"), c -> c.advice(expressionAdvice(c))
            )
            .get();
}

@Bean
public Advice expressionAdvice(GenericEndpointSpec<FileTransferringMessageHandler<ChannelSftp.LsEntry>> c) {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpression("payload.delete()");
    advice.setOnFailureExpression("payload + ' failed to upload'");
    advice.setTrapException(true);
    return advice;
}
Run Code Online (Sandbox Code Playgroud)

Art*_*lan 1

为此,您可以使用多种方法。

所有这些都是基于这样一个事实:您对 的原始请求消息执行了其他操作Sftp.outboundAdapter()

  1. .publishSubscribeChannel()允许您向多个订阅者发送相同的消息,当第二个订阅者收到该消息时,只有第一个订阅者完成其工作。默认情况下,如果你不指定Executor

  2. routeToRecipients()允许您通过不同的组件获得相同的结果。

  3. ExpressionEvaluatingRequestHandlerAdvice- 将其添加到端点定义.advice()Sftp.outboundAdapter()第二个参数中并通过以下方式.handle()执行:file.delete()onSuccessExpression

    .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
    
    Run Code Online (Sandbox Code Playgroud)