Nic*_*las 5 amqp spring-integration
我有一个依赖于Spring Integration(4.0.4.RELEASE)和RabbitMQ的应用程序.我的流程如下:
消息通过进程放入队列(他们不期望任何答案):网关 - >频道 - > RabbitMQ
然后由另一个过程排干:
RabbitMQ --1--> inbound-channel-adapter A --2--> chain B --3--> aggregator C --4--> service-activator D --5--> final service-activator E
具体的是,无处在我的应用我使用分离器:汇聚器C只是等待足够的信息来,或者超时到期,然后转发一批服务d.消息可能会在聚合器C中停留很长时间,并且不应被视为在那里消耗.只有在服务D成功完成后才能使用它们.因此,我在入站通道适配器A上使用MANUAL确认,服务E负责确认批次.
通过重新定义聚合器,我解决了设置为AUTO时的确认问题.实际上,如果流中发生任何异步过程,则会立即确认消息(请参阅此处的问题).因此,我切换到MANUAL确认并实现了这样的聚合器:
<bean class="org.springframework.integration.config.ConsumerEndpointFactoryBean">
<property name="inputChannel" ref="channel3"/>
<property name="handler">
<bean class="org.springframework.integration.aggregator.AggregatingMessageHandler">
<constructor-arg name="processor">
<bean class="com.test.AMQPAggregator"/>
</constructor-arg>
<property name="correlationStrategy">
<bean class="com.test.AggregatorDefaultCorrelationStrategy" />
</property>
<property name="releaseStrategy">
<bean class="com.test.AggregatorMongoReleaseStrategy" />
</property>
<property name="messageStore" ref="messageStoreBean"/>
<property name="expireGroupsUponCompletion" value="true"/>
<property name="sendPartialResultOnExpiry" value="true"/>
<property name="outputChannel" ref="channel4"/>
</bean>
</property>
</bean>
<bean id="messageStoreBean" class="org.springframework.integration.store.SimpleMessageStore"/>
<bean id="messageStoreReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore" />
<property name="timeout" value="${myapp.timeout}" />
</bean>
<task:scheduled-tasks>
<task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" />
</task:scheduled-tasks>
Run Code Online (Sandbox Code Playgroud)
我确实希望以不同的方式聚合标题,并保留所有amqp_deliveryTag的最高值,以便稍后在服务E中进行多重确认(参见本主题).到目前为止,这种方法效果很好,除了它比典型的聚合器命名空间(见这个旧的Jira票证)要冗长得多.
我只是使用基本配置:
链-B
<int:chain input-channel="channel2" output-channel="channel3">
<int:header-enricher>
<int:error-channel ref="errorChannel" /> // Probably useless
</int:header-enricher>
<int:json-to-object-transformer/>
<int:transformer ref="serviceABean"
method="doThis" />
<int:transformer ref="serviceBBean"
method="doThat" />
</int:chain>
Run Code Online (Sandbox Code Playgroud)
服务-d
<int:service-activator ref="serviceDBean"
method="doSomething"
input-channel="channel4"
output-channel="channel5" />
Run Code Online (Sandbox Code Playgroud)
由于我依赖MANUAL确认,我还需要手动拒绝消息,以防发生异常.我对入站通道适配器A有以下定义:
<int-amqp:inbound-channel-adapter channel="channel2"
queue-names="si.queue1"
error-channel="errorChannel"
mapped-request-headers="*"
acknowledge-mode="MANUAL"
prefetch-count="${properties.prefetch_count}"
connection-factory="rabbitConnectionFactory"/>
Run Code Online (Sandbox Code Playgroud)
我对errorChannel使用以下定义:
<int:chain input-channel="errorChannel">
<int:transformer ref="errorUnwrapperBean" method="unwrap" />
<int:service-activator ref="amqpAcknowledgerBean" method="rejectMessage" />
</int:chain>
Run Code Online (Sandbox Code Playgroud)
ErrorUnwrapper基于此代码和整个异常检测和消息拒绝工作得很好,直到消息到达汇聚器C.
如果在处理service-activator D中的消息时引发异常,那么我看到这个异常,但errorChannel似乎没有收到任何消息,并且我的ErrorUnwrapper unwrap ()方法没有被调用.抛出异常("ahahah")时我看到的定制堆栈跟踪如下:
2014-09-23 16:41:18,725 ERROR o.s.i.s.SimpleMessageStore:174: Exception in expiry callback
org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:170)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
(...)
Caused by: java.lang.Exception: ahahaha
at com.myapp.ServiceD.doSomething(ServiceD.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
(...)
2014-09-23 16:41:18,733 ERROR o.s.s.s.TaskUtils$LoggingErrorHandler:95: Unexpected error occurred in scheduled task.
org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha
(...)
Run Code Online (Sandbox Code Playgroud)
如何告诉处理来自此类聚合器的消息的服务将错误发布到errorChannel?我试图在标题中通过标题丰富的错误通道指定没有运气.我使用默认的errorChannel定义,但我也尝试更改其名称并重新定义它.我在这里一无所知,尽管我发现这个和那个,我没有设法得到它的工作.在此先感谢您的帮助!
正如您通过 StackTrace 看到的,您的进程是从MessageGroupStoreReaperThread 启动的,而 Thread 是从 default 启动的ThreadPoolTaskScheduler。
因此,您必须为此提供一个自定义 bean:
<bean id="scheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="errorHandler">
<bean class="org.springframework.integration.channel.MessagePublishingErrorHandler">
<property name="defaultErrorChannel" ref="errorChannel"/>
</bean>
</property>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" />
</task:scheduled-tasks>
Run Code Online (Sandbox Code Playgroud)
error-channel然而,我看到了在 上的好处<aggregator>,我们确实有来自不同独立线程的几个点,但我们无法正常处理。
| 归档时间: |
|
| 查看次数: |
1548 次 |
| 最近记录: |