我已经实现了IntegrationFlow我想要执行以下任务的位置:
WebFluxRequestExecutingMessageHandler到REST-Endpoint并使用a AdviceChain来处理成功和错误响应履行
@Configuration
@Slf4j
public class JsonToRestIntegration {
@Autowired
private LoadBalancerExchangeFilterFunction lbFunction;
@Value("${json_folder}")
private String jsonPath;
@Value("${json_success_folder}")
private String jsonSuccessPath;
@Value("${json_error_folder}")
private String jsonErrorPath;
@Value("${rest-service-url}")
private String restServiceUrl;
@Bean
public DirectChannel httpResponseChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel successChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel failureChannel() {
return new DirectChannel();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}
@Bean
public IntegrationFlow jsonFileToRestFlow() {
return …Run Code Online (Sandbox Code Playgroud) java integration-testing spring-integration spring-integration-dsl
我已经用任务执行器设置了文件轮询器
ExecutorService executorService = Executors.newFixedThreadPool(10);
LOG.info("Setting up the poller for directory {} ", finalDirectory);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
.taskExecutor(executorService)
.maxMessagesPerPoll(10)
.advice(new LoggerSourceAdvisor(finalDirectory))
))
//move file to processing first processing
.transform(new FileMoveTransformer("C:/processing", true))
.channel("fileRouter")
.get();
Run Code Online (Sandbox Code Playgroud)
正如所见,我已将threadpool每次轮询设置为固定的 10 条消息和最多 10 条消息。如果我放了 10 个文件,它仍然会一一处理。这里有什么问题?
* 更新 *
尽管我现在有其他问题,但在加里的回答之后它工作得很好。
我已经像这样设置了我的轮询器
setDirectory(new File(path));
DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();
scanner.setFilter(new AcceptAllFileListFilter<>());
setScanner(scanner);
Run Code Online (Sandbox Code Playgroud)
使用的原因是AcceptAll因为同一个文件可能会再次出现,这就是我先移动文件的原因。但是当我启用线程执行器时,多个线程正在处理同一个文件,我假设是因为AcceptAllFile
如果我更改AcceptOnceFileListFilter它可以工作,但是再次出现的同一个文件将不会再次被拾取!可以做些什么来避免这个问题?
问题/错误
在课堂上AbstractPersistentAcceptOnceFileListFilter我们有这个代码
@Override
public boolean accept(F file) …Run Code Online (Sandbox Code Playgroud) I upgrade my Spring boot version from 2.0.5.RELEASE to 2.1.8.RELEASE (so Spring Integration from 5.0 to 5.1) and the automatic type casting inside integration flow doesn't work anymore. I am used to define a set of @IntegrationConverter components and automatic casting with the operation transform(Type.class, p -> p) inside integration flow code but with the new version it seems to be broken.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from …Run Code Online (Sandbox Code Playgroud) java upgrade spring-integration spring-boot spring-integration-dsl
我不确定为什么会出现异常
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
Run Code Online (Sandbox Code Playgroud)
它只是一个简单的 IntegrationFlow,但不确定我在下面的代码中缺少什么。
@Bean
Exchange messageExchange() {
return ExchangeBuilder
.directExchange("attr")
.durable(true)
.build();
}
@Bean
Queue queue() {
return QueueBuilder
.durable("attr_queue")
.build();
}
@Bean
Binding binding() {
return BindingBuilder
.bind(queue())
.to(messageExchange())
.with("attr_queue")
.noargs();
}
@Bean
IntegrationFlow deltaFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp
.inboundAdapter(connectionFactory, queue()))
.handle(String.class, (payload, headers) -> {
if (payload.isEmpty()) {
log.info("Payload empty");
} else {
log.info("Payload : " + payload);
}
return payload;
})
.get();
}
Run Code Online (Sandbox Code Playgroud)
我试图接触 Spring Integration,但不确定为什么我会收到此异常。我要做的就是使用 读取队列并将其inboundAdapter …
spring spring-integration spring-integration-amqp spring-integration-dsl
我一直在尝试配置 Spring Integration dsl 以从 Tibco EMS 主题读取,对收到的消息进行一些处理,然后将其推送到 ActiveMQ 队列。我能够使用 XML 配置成功地设置它,但想改用 spring 集成 dsl。我无法弄清楚,也无法在网上找到任何相关帮助。
我将消息推送到 ActiveMQ 的配置是这样的 -
@Bean
public IntegrationFlow toActiveMQFlow(
MessageChannel channel,
ActiveMQQueue queue,
CachingConnectionFactory cachingConnectionFactory) {
return IntegrationFlows.from(channel)
.transform(Object::toString)
.handle(Jms.outboundAdapter(cachingConnectionFactory).destination(queue))
.get();
}
Run Code Online (Sandbox Code Playgroud)
我认为从 Tibco EMS 主题读取的配置应该是这样的 -
@Bean
public IntegrationFlow fromTibcoTopicFlow(
MessageChannel channel,
ConnectionFactory tibcoEmsConnectionFactory,
Topic tibcoTopic
) {
return IntegrationFlows
.from(SomeInboundAdapter(tibcoEmsConnectionFactory).destination(tibcoTopic))
.transform(Object::toString)
.channel(channel)
.get();
}
Run Code Online (Sandbox Code Playgroud)
由于我在后一种配置上没有找到太多帮助,因此求助于 XML 配置是我唯一的选择吗?
请纠正/编辑/指出我仍在学习 Spring Integration DSL 时所犯的任何错误。
感谢你的帮助!
我有一个使用 Spring Integration DSL 流和 Kafka 绑定器的简单 Spring Cloud Stream 项目。一切正常,但来自 Kafka 的消息头值以byte[].
这意味着我的 SI@Header参数需要是 类型byte[]。哪个有效,但最好将它们作为字符串(我关心的所有入站标头都是字符串值)。
我已经将 Kafka 客户端配置为使用 StringSerializer/StringDeserializer。我假设我还需要以某种方式告诉 Spring Kafka 哪些标头映射为字符串以及使用什么字符编码。
我显然在这里遗漏了一些东西。有小费吗?
spring spring-integration spring-cloud-stream spring-kafka spring-integration-dsl
当给定的评估返回 false 时,我需要将消息从父流路由到新流,但当该评估返回 true 时,让它继续在父流中。目前我已经能够使用 Spring Integration DSL.filter()方法成功实现此功能,没有任何问题。但是,我觉得.filter()这样使用似乎并不符合该方法的真正意图。是否有某种类型的路由器可以用来更好地实现同样的需求?是否有必要从这种实现更改.filter()为基于路由器的实现?
以下面的集成流程配置为例......
@Bean
public IntegrationFlow flow() {
return IntegrationFlows
.from("inboundChannel")
.filter(someService::someTrueFalseMethod, onFalseReturn -> onFalseReturn.discardChannel("otherFlowInboundChannel"))
.handle(someService::someHandleMethod)
.get();
}
@Bean
public IntegrationFlow otherFlow() {
return IntegrationFlows
.from("otherFlowInboundChannel")
.handle(someOtherService::someOtherHandleMethod)
.get();
}
Run Code Online (Sandbox Code Playgroud)
到目前为止,这似乎.routeToRecipents()可能是我需要使用的。在我的场景中,我需要评估消息的标头,这就是recipientMessageSelector使用它的原因。
@Bean
public IntegrationFlow flow() {
return IntegrationFlows
.from("inboundChannel"
.routeToRecipients(router -> router
.recipientMessageSelector("otherFlowInboundChannel", someService::someTrueFalseMethod)
.defaultOutputToParentFlow()
)
.handle(someService::someHandleMethod)
.get();
}
@Bean
public IntegrationFlow otherFlow() {
return IntegrationFlows
.from("otherFlowInboundChannel")
.handle(someOtherService::someOtherHandleMethod)
.get();
}
Run Code Online (Sandbox Code Playgroud)
即使这个routeToRecipients解决方案似乎有效,它和上面的过滤器实现之间真的有什么好处吗?
spring spring-integration spring-boot spring-integration-dsl
我有一个正在运行的应用程序,它定义了入站/出站网关,分离器,聚合器,路由器等的一些弹簧集成流程......
这些流都是使用spring integration dsl和annotations创建的......所以没有XML.
有没有可以生成EE模式图的工具?
这个问题表明Intellij可以为xml配置做到这一点......我想要一些类似于dsl IntegrationFlow的东西
目标
我想向某个主题发送一条消息,稍后我将使用客户端应用程序处理该消息。为此,我使用 Spring Boot 和 Spring Integration Java DSL 及其 JMS 模块。作为消息代理,我使用本机 ActiveMQ Artemis。
这是我的设置
演示应用程序.java
@SpringBootApplication
public class DemoApplication {
private static final Logger logger = LoggerFactory.getLogger(DemoApplication.class);
public interface StarGate {
void sendHello(String helloText);
}
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows
.from(StarGate.class)
.handle(Jms.outboundAdapter(connectionFactory)
.configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec
.deliveryPersistent(true)
.pubSubDomain(true)
.sessionTransacted(true)
.sessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
.explicitQosEnabled(true)
)
.destination(new ActiveMQTopic("wormhole")))
.get();
}
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
StarGate stargate = context.getBean(StarGate.class);
stargate.sendHello("Jaffa, kree!");
logger.info("Hello …Run Code Online (Sandbox Code Playgroud) jms spring-integration activemq-artemis spring-integration-dsl
我有一个接受字符串输入的 Flow
@Bean
public IntegrationFlow myFlow() {
// @formatter:off
return IntegrationFlows.from("some.input.channel")
.handle(someService)
.get();
Run Code Online (Sandbox Code Playgroud)
我如何从我的集成测试中调用它,如何在“some.input.channel”上放置一个字符串消息