Spring Integration:如何一次处理多条消息?

sel*_*rce 8 xml spring message-queue spring-integration

我有以下配置:

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactoryDefault"/>
</bean>

<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
    <int:queue message-store="mongoDbMessageStore"/>
</int:channel>

<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
    <int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>
Run Code Online (Sandbox Code Playgroud)

并将消息处理程序定义为

@Override
public void handleMessage(Message<?> message) throws MessagingException {
    Object payload = message.getPayload();
    if (payload instanceof LogEntry) {
        LogEntry logEntry = (LogEntry) payload;
        String app = (String) message.getHeaders().get("app");
        logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
        logEntryPostProcessService.postProcess(app, logEntry);
    } else {
        throw new MessageRejectedException(message, "Unknown data type has been received.");
    }
}
Run Code Online (Sandbox Code Playgroud)

我想拥有的是什么

@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}
Run Code Online (Sandbox Code Playgroud)

所以基本上轮询器在一次调用中发送所有10条消息,而不是每条消息调用10次方法.

这样做的原因是可以批量处理块中的所有消息,从而提高性能.

Art*_*lan 5

是的,因为(AbstractPollingEndpoint):

taskExecutor.execute(new Runnable() {
    @Override
    public void run() {
        int count = 0;
        while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
...
            if (!pollingTask.call()) {
                break;
            }
...
    }
});
Run Code Online (Sandbox Code Playgroud)

因此,您所有的消息(max-messages-per-poll)都在同一线程中处理。但是,它们被一一发送到处理程序,而不是一堆发送。

若要并行处理,应ExecutorChannel在之前使用logEntryPostProcessorReceiver。像这样:

<channel id="executorChannel">
   <dispatcher task-executor="threadPoolExecutor"/>
</channel>

<bridge input-channel="logEntryChannel" output-channel="executorChannel">
   <poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>

<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>
Run Code Online (Sandbox Code Playgroud)

更新

要批量处理消息,您应该对aggregate它们进行处理。由于它们都是的结果polling endpoint,因此sequenceDetails消息中没有任何内容。您可以通过以下伪造的值来克服它correlationId

<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
        release-strategy-expression="size() == 10"/>
Run Code Online (Sandbox Code Playgroud)

size() == 10应等于max-messages-per-poll

这之后您logEntryPostProcessorReceiver有应用listpayload秒。或仅一封邮件,这payload是的结果列表<aggregator>