sel*_*rce 8 xml spring message-queue spring-integration
我有以下配置:
<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactoryDefault"/>
</bean>
<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
<int:queue message-store="mongoDbMessageStore"/>
</int:channel>
<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
<int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>
Run Code Online (Sandbox Code Playgroud)
并将消息处理程序定义为
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
if (payload instanceof LogEntry) {
LogEntry logEntry = (LogEntry) payload;
String app = (String) message.getHeaders().get("app");
logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
logEntryPostProcessService.postProcess(app, logEntry);
} else {
throw new MessageRejectedException(message, "Unknown data type has been received.");
}
}
Run Code Online (Sandbox Code Playgroud)
我想拥有的是什么
@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}
Run Code Online (Sandbox Code Playgroud)
所以基本上轮询器在一次调用中发送所有10条消息,而不是每条消息调用10次方法.
这样做的原因是可以批量处理块中的所有消息,从而提高性能.
是的,因为(AbstractPollingEndpoint):
taskExecutor.execute(new Runnable() {
@Override
public void run() {
int count = 0;
while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
...
if (!pollingTask.call()) {
break;
}
...
}
});
Run Code Online (Sandbox Code Playgroud)
因此,您所有的消息(max-messages-per-poll)都在同一线程中处理。但是,它们被一一发送到处理程序,而不是一堆发送。
若要并行处理,应ExecutorChannel在之前使用logEntryPostProcessorReceiver。像这样:
<channel id="executorChannel">
<dispatcher task-executor="threadPoolExecutor"/>
</channel>
<bridge input-channel="logEntryChannel" output-channel="executorChannel">
<poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>
<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>
Run Code Online (Sandbox Code Playgroud)
更新
要批量处理消息,您应该对aggregate它们进行处理。由于它们都是的结果polling endpoint,因此sequenceDetails消息中没有任何内容。您可以通过以下伪造的值来克服它correlationId:
<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 10"/>
Run Code Online (Sandbox Code Playgroud)
凡size() == 10应等于max-messages-per-poll。
这之后您logEntryPostProcessorReceiver有应用list的payload秒。或仅一封邮件,这payload是的结果列表<aggregator>。
| 归档时间: |
|
| 查看次数: |
4282 次 |
| 最近记录: |