我有一个自我执行的jar程序,它很大程度上依赖于Spring Integration.我遇到的问题是程序在其他Spring bean完成之前终止.
下面是我正在使用的代码的简化版本,如果需要,我可以提供更多代码/配置.入口点是一个main()方法,它引导Spring并启动导入过程:
public static void main(String[] args) {
ctx = new ClassPathXmlApplicationContext("flow.xml");
DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
try {
importer.startImport();
} catch (Exception e) {
e.printStackTrace();
} finally {
ctx.close();
}
}
Run Code Online (Sandbox Code Playgroud)
DataImporter包含一个简单的循环,用于将消息发送到Spring Integration网关.这为流程提供了一种主动的"推送"方法,而不是轮询数据的常用方法.这是我的问题所在:
public void startImport() throws Exception {
for (Item item : items) {
gatewayBean.publish(item);
Thread.sleep(200); // Yield period
}
}
Run Code Online (Sandbox Code Playgroud)
为了完整起见,流XML看起来像这样:
<gateway default-request-channel="inChannel" service-interface="GatewayBean" />
<splitter input-channel="inChannel" output-channel="splitChannel" />
<payload-type-router input-channel="splitChannel">
<mapping type="Item" channel="itemChannel" />
<mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>
<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" …Run Code Online (Sandbox Code Playgroud) 我正在尝试在Spring中创建一个服务器,它正在侦听TCP端口并接受连接.我知道如何将传入的请求路由到我的服务,它可以响应这些请求.但是我想在没有收到任何请求的情况下向某些客户发送消息.例如,有时我必须告知客户它已收到消息.
为此,我认为我需要一种方法来识别客户端,例如让他们登录.有没有办法为每个活动连接都有一个"会话"对象,我可以在其中存储登录数据?
如何向使用用户名X登录的客户端发送消息?
这在春天有可能吗?
入站和出站通道适配器之间的根本区别是什么?
任何例子都会非常有用.
我已经回顾了Spring文档,这种"定向"区别对我来说并不清楚.我支持配置了出站通道适配器的应用程序,但我发现行为计数器直观的出站标签.此适配器获取外部文件,然后将其带入我们解析文件并保留数据的应用程序.
这与此问题类似,但我想更加关注通道适配器,并希望获得更多反馈!
谢谢!
我正在尝试使用spring集成设置一个简单的应用程序.目标是简单地使用文件入站通道适配器来监视目录中的新文件和处理文件.为简单起见,此时处理文件只是记录一些输出(正在处理的文件的名称).但我想以多线程方式处理文件.因此,假设有10个文件被拾取并且应该并行处理,一旦完成这些文件,我们就会转到接下来的10个文件.
为此我尝试了两种不同的方法,两者似乎都工作方式相似,我想了解使用poller或dispatcher之间的区别.
方法#1 - 使用轮询器
<int-file:inbound-channel-adapter id="filesIn" directory="in">
<int:poller fixed-rate="1" task-executor="executor" />
</int-file:inbound-channel-adapter>
<int:service-activator ref="moveToStage" method="move" input-channel="filesIn" />
<task:executor id="executor" pool-size="5" queue-capacity="0" rejection-policy="DISCARD" />
Run Code Online (Sandbox Code Playgroud)
所以在这里,我理解的想法是我们不断轮询目录,一旦收到文件,它就会发送到filesIn通道,直到达到池限制.然后,直到池被占用,即使我假设轮询仍在后台继续,也不会发送其他文件.这似乎有效,但我不确定使用每次轮询的最大消息是否有助于降低轮询频率.通过将每个轮询的最大消息数设置为接近池大小.
方法#2 - 使用调度程序
<int-file:inbound-channel-adapter id="filesIn" directory="in">
<int:poller fixed-rate="5000" max-messages-per-poll="3" />
</int-file:inbound-channel-adapter>
<int:bridge input-channel="filesIn" output-channel="filesReady" />
<int:channel id="filesReady">
<int:dispatcher task-executor="executor"/>
</int:channel>
<int:service-activator ref="moveToStage" method="move" input-channel="filesInReady" />
<task:executor id="executor" pool-size="5" queue-capacity="0" rejection-policy="CALLER_RUNS" />
Run Code Online (Sandbox Code Playgroud)
好吧所以这里轮询器没有使用执行器,所以我假设它以顺序方式进行轮询.应该拾取每个poll 3文件,然后发送到filesReady通道,然后使用调度程序将文件传递给服务激活器,因为它使用执行程序进行调度,它立即返回控制并允许filesIn通道发送更多文件.
我想我的问题是我是否正确理解这两种方法,如果一种方法比其他方法更好.
谢谢
我是Spring的新成员,并在Spring集成http模块中工作,以满足我的项目需求.我从出站网关发送请求作为http客户端.我正在尝试向服务器发起请求,服务器应该使用我的设置值返回消息有效负载.我正在将对象转换为JSON使用发送到服务器我正在从客户端(HttpClientDemo)向服务器端的入站网关发送请求,如下所示.为此,我将我的对象转换为JSON,然后在客户端转换为JSON字符串到对象,在那里执行一些简单的操作并将其发送回客户端(HttpClientDemo)但在此之前,我得到与之相关的异常HttpMessageConverter如下:
Exception in thread "main" org.springframework.web.client.RestClientException: Could not extract response: no suitable HttpMessageConverter found for response type [class com.mycompany.MyChannel.model.FFSampleResponseHttp] and content type [text/plain;charset=UTF-8]
at org.springframework.web.client.HttpMessageConverterExtractor.extractData(HttpMessageConverterExtractor.java:108)
at org.springframework.web.client.RestTemplate$ResponseEntityResponseExtractor.extractData(RestTemplate.java:784)
at org.springframework.web.client.RestTemplate$ResponseEntityResponseExtractor.extractData(RestTemplate.java:769)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:549)
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:517)
at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:462)
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.handleRequestMessage(HttpRequestExecutingMessageHandler.java:421)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:170)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribablMyChannel.doSend(AbstractSubscribablMyChannel.java:77)
at org.springframework.integration.channel.AbstractMessagMyChannel.send(AbstractMessagMyChannel.java:255)
at org.springframework.integration.channel.AbstractMessagMyChannel.send(AbstractMessagMyChannel.java:223)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:93)
Run Code Online (Sandbox Code Playgroud)
请在下面找到相关代码:
客户端代码:HttpClientDemo.java
public class HttpClientDemo {
private static Logger logger = Logger.getLogger(HttpClientDemo.class);
public static void main(String[] args) {
ApplicationContext context = …Run Code Online (Sandbox Code Playgroud) 将项目从 Spring Boot v2.7 迁移到 v3.0(从而从 Spring Integration v5.5 迁移到 v6.0)后,会打印出以下警告:
\nWARN 22084 --- [ restartedMain] ocalVariableTableParameterNameDiscoverer : Using deprecated \'-debug\' fallback for parameter name resolution. Compile the affected code with \'-parameters\' instead or avoid its introspection: com.foobar.MyClassA\nWARN 22084 --- [ restartedMain] ocalVariableTableParameterNameDiscoverer : Using deprecated \'-debug\' fallback for parameter name resolution. Compile the affected code with \'-parameters\' instead or avoid its introspection: com.foobar.MyClassB\nWARN 22084 --- [ restartedMain] ocalVariableTableParameterNameDiscoverer : Using deprecated \'-debug\' fallback for parameter name resolution. Compile …Run Code Online (Sandbox Code Playgroud) 我做了一些搜索,但找不到任何样本/示例.
我需要读取一个表(输入)的地理坐标,处理它以生成与坐标相关的POI.因此,一个地理坐标将导致需要将一个或多个POI插入另一个表(输出).
我目前正在使用JdbcCursorItemReader和JdbcBatchItemWriter来读取一个项目/记录并写入一个项目/记录.还有一个ItemProcessor,它为给定的地理坐标生成POI.
自定义JdbcBatchItemWriter是否帮助我实现了这一目标?
有任何想法吗?TIA.
嗨,我很难解决我的xml配置,
这是我的spring integration config xml:
<context:annotation-config />
<context:component-scan base-package="hk.com.test.spring.integration" />
<int:channel id="orders" />
<int:channel id="drinks" />
<int:channel id="hotDrink">
<int:queue capacity="5" />
</int:channel>
<int:channel id="coldDrink">
<int:queue capacity="10" />
</int:channel>
<bean id="drinkRouter" class="hk.com.test.spring.integration.DrinkRouter" />
<bean id="orderSplitter" class="hk.com.test.spring.integration.OrderSplitter" />
<bean id="barista" class="hk.com.test.spring.integration.Barista" />
<int:gateway id="cafe" service-interface="hk.com.test.spring.integration.Cafe" />
<int:splitter input-channel="orders" ref="orderSplitter"
method="split" output-channel="drinks" />
<int:router input-channel="drinks" ref="drinkRouter" method="resolveItemChannel" />
<int:service-activator input-channel="coldDrink"
ref="barista" method="prepareColdDrink" />
<int:service-activator input-channel="hotDrink"
ref="barista" method="preparehotDrink" />
Run Code Online (Sandbox Code Playgroud)
这是我的主要课程::
public class Main {
public static void main(String args[]) {
System.out.println("Hello");
// load the …Run Code Online (Sandbox Code Playgroud) 根据这里提供的文档,我正在尝试使用POC将消息传入监听器,如同一文档中所述,下面是我编写配置的方法.
@Configuration
public class KafkaConsumerConfig {
public static final String TEST_TOPIC_ID = "record-stream";
@Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
private String topic;
@Value("${kafka.address:localhost:9092}")
private String brokerAddress;
/*
@Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String>
kafkaMessageDrivenChannelAdapter = new
KafkaMessageDrivenChannelAdapter<>( container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return
kafkaMessageDrivenChannelAdapter; }
@Bean public QueueChannel received() { return new QueueChannel(); }
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
/* …Run Code Online (Sandbox Code Playgroud) 我正在使用来自amazon SQS队列的消息.我在队列中有数千条消息.当我启动应用程序(使用带有spring框架的Java编写)时,它开始从队列中轮询消息,并在收到500条消息后停止.如果我再次启动应用程序它将消耗另外500条消息.
我的代码就像......
连接工厂
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactoryActiveMQ() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("3-15");
factory.setReceiveTimeout(3000L);
return factory;
}
@Bean(name = "sqsJmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(CustomDestinationResolver resolver) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(sqsConnectionFactory());
factory.setConcurrency("3-15");
factory.setReceiveTimeout(3000L);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
倾听者
@JmsListener(containerFactory = "sqsJmsListenerContainerFactory", destination = "sqs.queue")
public void onMessage(Message message) {
//Processing message
}
Run Code Online (Sandbox Code Playgroud)
我需要在亚马逊队列或连接工厂bean中配置什么?
谢谢 :-)
更新:添加了线程转储
Application正在消耗消息
,而线程转储中的DefaultMessageListenerContainer就像
"DefaultMessageListenerContainer-1@8242" prio=5 tid=0x18 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503) …Run Code Online (Sandbox Code Playgroud) spring ×7
java ×6
amazon-sqs ×1
apache-kafka ×1
aws-sdk ×1
messaging ×1
poller ×1
sockets ×1
spring-batch ×1
spring-boot ×1
tcp ×1