标签: spring-integration-dsl

如何使用MockIntegrationContext.substituteMessageHandlerFor模拟WebFluxRequestExecutingMessageHandler

我已经实现了IntegrationFlow我想要执行以下任务的位置:

  1. 轮询目录中的文件
  2. 将文件内容转换为字符串
  3. 将字符串发送WebFluxRequestExecutingMessageHandler到REST-Endpoint并使用a AdviceChain来处理成功和错误响应

履行

@Configuration
@Slf4j
public class JsonToRestIntegration {

    @Autowired
    private LoadBalancerExchangeFilterFunction lbFunction;

    @Value("${json_folder}")
    private String jsonPath;

    @Value("${json_success_folder}")
    private String jsonSuccessPath;

    @Value("${json_error_folder}")
    private String jsonErrorPath;

    @Value("${rest-service-url}")
    private String restServiceUrl;

    @Bean
    public DirectChannel httpResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel successChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel failureChannel() {
        return new DirectChannel();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(1000).get();
    }

   @Bean
public IntegrationFlow jsonFileToRestFlow() {
    return …
Run Code Online (Sandbox Code Playgroud)

java integration-testing spring-integration spring-integration-dsl

8
推荐指数
1
解决办法
432
查看次数

TaskExecutor 不工作 Spring Integration

我已经用任务执行器设置了文件轮询器

ExecutorService executorService = Executors.newFixedThreadPool(10);

            LOG.info("Setting up the poller for directory {} ", finalDirectory);
            StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
                    c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
                            .taskExecutor(executorService)
                            .maxMessagesPerPoll(10)
                            .advice(new LoggerSourceAdvisor(finalDirectory))
                    ))


                    //move file to processing first processing                    
                    .transform(new FileMoveTransformer("C:/processing", true))
                    .channel("fileRouter")
                    .get();
Run Code Online (Sandbox Code Playgroud)

正如所见,我已将threadpool每次轮询设置为固定的 10 条消息和最多 10 条消息。如果我放了 10 个文件,它仍然会一一处理。这里有什么问题?

* 更新 *

尽管我现在有其他问题,但在加里的回答之后它工作得很好。

我已经像这样设置了我的轮询器

setDirectory(new File(path));
        DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();

        scanner.setFilter(new AcceptAllFileListFilter<>());
        setScanner(scanner);
Run Code Online (Sandbox Code Playgroud)

使用的原因是AcceptAll因为同一个文件可能会再次出现,这就是我先移动文件的原因。但是当我启用线程执行器时,多个线程正在处理同一个文件,我假设是因为AcceptAllFile

如果我更改AcceptOnceFileListFilter它可以工作,但是再次出现的同一个文件将不会再次被拾取!可以做些什么来避免这个问题?

问题/错误

在课堂上AbstractPersistentAcceptOnceFileListFilter我们有这个代码

@Override
    public boolean accept(F file) …
Run Code Online (Sandbox Code Playgroud)

java spring-integration spring-integration-dsl

6
推荐指数
1
解决办法
1704
查看次数

Spring Integration 5.1 - integration flow convertion with @IntegrationConverter doesn't work

I upgrade my Spring boot version from 2.0.5.RELEASE to 2.1.8.RELEASE (so Spring Integration from 5.0 to 5.1) and the automatic type casting inside integration flow doesn't work anymore. I am used to define a set of @IntegrationConverter components and automatic casting with the operation transform(Type.class, p -> p) inside integration flow code but with the new version it seems to be broken.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from …
Run Code Online (Sandbox Code Playgroud)

java upgrade spring-integration spring-boot spring-integration-dsl

6
推荐指数
1
解决办法
54
查看次数

Spring 集成错误“没有可用的输出通道或回复通道标头”

我不确定为什么会出现异常

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
Run Code Online (Sandbox Code Playgroud)

它只是一个简单的 IntegrationFlow,但不确定我在下面的代码中缺少什么。

  @Bean
  Exchange messageExchange() {
    return ExchangeBuilder
        .directExchange("attr")
        .durable(true)
        .build();
  }

  @Bean
  Queue queue() {
    return QueueBuilder
        .durable("attr_queue")
        .build();
  }

  @Bean
  Binding binding() {
    return BindingBuilder
        .bind(queue())
        .to(messageExchange())
        .with("attr_queue")
        .noargs();
  }

  @Bean
  IntegrationFlow deltaFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp
        .inboundAdapter(connectionFactory, queue()))
        .handle(String.class, (payload, headers) -> {
          if (payload.isEmpty()) {
            log.info("Payload empty");
          } else {
            log.info("Payload : " + payload);
          }
          return payload;
        })
        .get();
  }
Run Code Online (Sandbox Code Playgroud)

我试图接触 Spring Integration,但不确定为什么我会收到此异常。我要做的就是使用 读取队列并将其inboundAdapter …

spring spring-integration spring-integration-amqp spring-integration-dsl

5
推荐指数
1
解决办法
4277
查看次数

使用 Spring Integration DSL 读取 Tibco EMS 主题

我一直在尝试配置 Spring Integration dsl 以从 Tibco EMS 主题读取,对收到的消息进行一些处理,然后将其推送到 ActiveMQ 队列。我能够使用 XML 配置成功地设置它,但想改用 spring 集成 dsl。我无法弄清楚,也无法在网上找到任何相关帮助。

我将消息推送到 ActiveMQ 的配置是这样的 -

@Bean
public IntegrationFlow toActiveMQFlow(
        MessageChannel channel,
        ActiveMQQueue queue,
        CachingConnectionFactory cachingConnectionFactory) {
    return IntegrationFlows.from(channel)
            .transform(Object::toString)
            .handle(Jms.outboundAdapter(cachingConnectionFactory).destination(queue))
            .get();
}
Run Code Online (Sandbox Code Playgroud)

我认为从 Tibco EMS 主题读取的配置应该是这样的 -

@Bean
public IntegrationFlow fromTibcoTopicFlow(
        MessageChannel channel,
        ConnectionFactory tibcoEmsConnectionFactory,
        Topic tibcoTopic
) {
    return IntegrationFlows
            .from(SomeInboundAdapter(tibcoEmsConnectionFactory).destination(tibcoTopic))
            .transform(Object::toString)
            .channel(channel)
            .get();
}
Run Code Online (Sandbox Code Playgroud)

由于我在后一种配置上没有找到太多帮助,因此求助于 XML 配置是我唯一的选择吗?

请纠正/编辑/指出我仍在学习 Spring Integration DSL 时所犯的任何错误。

感谢你的帮助!

spring tibco-ems spring-integration spring-integration-dsl

5
推荐指数
1
解决办法
1066
查看次数

如何在 Spring Cloud Stream 项目中将传入的标头映射为 String 而不是 byte[]?

我有一个使用 Spring Integration DSL 流和 Kafka 绑定器的简单 Spring Cloud Stream 项目。一切正常,但来自 Kafka 的消息头值以byte[].

这意味着我的 SI@Header参数需要是 类型byte[]。哪个有效,但最好将它们作为字符串(我关心的所有入站标头都是字符串值)。

我已经将 Kafka 客户端配置为使用 StringSerializer/StringDeserializer。我假设我还需要以某种方式告诉 Spring Kafka 哪些标头映射为字符串以及使用什么字符编码。

我显然在这里遗漏了一些东西。有小费吗?

spring spring-integration spring-cloud-stream spring-kafka spring-integration-dsl

5
推荐指数
1
解决办法
1851
查看次数

Spring Integration DSL Filter 与带有单个收件人和 DefaultOutputToParentFlow 的 RouteToRecipients

当给定的评估返回 false 时,我需要将消息从父流路由到新流,但当该评估返回 true 时,让它继续在父流中。目前我已经能够使用 Spring Integration DSL.filter()方法成功实现此功能,没有任何问题。但是,我觉得.filter()这样使用似乎并不符合该方法的真正意图。是否有某种类型的路由器可以用来更好地实现同样的需求?是否有必要从这种实现更改.filter()为基于路由器的实现?

以下面的集成流程配置为例......

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows
            .from("inboundChannel")
            .filter(someService::someTrueFalseMethod, onFalseReturn -> onFalseReturn.discardChannel("otherFlowInboundChannel"))
            .handle(someService::someHandleMethod)
            .get();
}

@Bean
public IntegrationFlow otherFlow() {
    return IntegrationFlows
            .from("otherFlowInboundChannel")
            .handle(someOtherService::someOtherHandleMethod)
            .get();
}
Run Code Online (Sandbox Code Playgroud)

到目前为止,这似乎.routeToRecipents()可能是我需要使用的。在我的场景中,我需要评估消息的标头,这就是recipientMessageSelector使用它的原因。

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows
            .from("inboundChannel"
            .routeToRecipients(router -> router
                .recipientMessageSelector("otherFlowInboundChannel", someService::someTrueFalseMethod)
                .defaultOutputToParentFlow()
            )
            .handle(someService::someHandleMethod)
            .get();
}

@Bean
public IntegrationFlow otherFlow() {
    return IntegrationFlows
            .from("otherFlowInboundChannel")
            .handle(someOtherService::someOtherHandleMethod)
            .get();
}
Run Code Online (Sandbox Code Playgroud)

即使这个routeToRecipients解决方案似乎有效,它和上面的过滤器实现之间真的有什么好处吗?

spring spring-integration spring-boot spring-integration-dsl

5
推荐指数
1
解决办法
2173
查看次数

有一些方法可以生成弹簧集成dsl流的图表吗?

我有一个正在运行的应用程序,它定义了入站/出站网关,分离器,聚合器,路由器等的一些弹簧集成流程......

这些流都是使用spring integration dsl和annotations创建的......所以没有XML.

有没有可以生成EE模式图的工具?

这个问题表明Intellij可以为xml配置做到这一点......我想要一些类似于dsl IntegrationFlow的东西

java spring-integration spring-integration-dsl

4
推荐指数
1
解决办法
2108
查看次数

使用 Spring Integration 向 ActiveMQ Artemis 主题发送消息

目标

我想向某个主题发送一条消息,稍后我将使用客户端应用程序处理该消息。为此,我使用 Spring Boot 和 Spring Integration Java DSL 及其 JMS 模块。作为消息代理,我使用本机 ActiveMQ Artemis。


这是我的设置

演示应用程序.java

@SpringBootApplication
public class DemoApplication {

    private static final Logger logger = LoggerFactory.getLogger(DemoApplication.class);

    public interface StarGate {
        void sendHello(String helloText);
    }

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public IntegrationFlow mainFlow() {
        return IntegrationFlows
                .from(StarGate.class)
                .handle(Jms.outboundAdapter(connectionFactory)
                        .configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec
                                .deliveryPersistent(true)
                                .pubSubDomain(true)
                                .sessionTransacted(true)
                                .sessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
                                .explicitQosEnabled(true)
                        )
                        .destination(new ActiveMQTopic("wormhole")))
                .get();
    }

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
        StarGate stargate = context.getBean(StarGate.class);
        stargate.sendHello("Jaffa, kree!");
        logger.info("Hello …
Run Code Online (Sandbox Code Playgroud)

jms spring-integration activemq-artemis spring-integration-dsl

4
推荐指数
1
解决办法
2533
查看次数

如何在 Spring 集成测试中调用通道

我有一个接受字符串输入的 Flow

  @Bean
  public IntegrationFlow myFlow() {
        // @formatter:off
        return IntegrationFlows.from("some.input.channel")
                               .handle(someService)
                               .get();
Run Code Online (Sandbox Code Playgroud)

我如何从我的集成测试中调用它,如何在“some.input.channel”上放置一个字符串消息

spring-integration spring-integration-dsl

3
推荐指数
1
解决办法
2370
查看次数