Rai*_*baz 5 apache-camel rabbitmq kotlin
我有一组 Camel 路由配置为读取和写入 RabbitMQ 队列,或多或少是这样的:
from("rabbitmq:$rabbitMQVhost?connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueue}&routingKey=${it.rabbitMQQueue}&SOME_MORE_PROPERTIES")
.log("Read message from queue ${it.rabbitMQQueue}")
.routeId(it.rabbitMQQueue)
.noAutoStartup()
.bean(it.rabbitMQBean)
.choice()
.`when`(PredicateBuilder.and(simple("$myCondition"), isNotNull(body())))
.split(body())
.toD("rabbitmq:$rabbitMQVhost?connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueueDestination}&autoDelete=false&routingKey=${it.rabbitMQQueueDestination}&bridgeEndpoint=true")
.endChoice()
.otherwise()
end()
Run Code Online (Sandbox Code Playgroud)
SOME_MORE_PROPERTIES基本上在哪里autoDelete=false&autoAck=false和一些消息预取设置。
我的 ConnectionFactory 是一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory.
每当消息进入我的源队列时,就会启动一个线程来处理它;然而,处理完成后,它挂在等待状态,永远不会被释放或终止,所以我的应用程序内存在一段时间后饱和,垃圾收集器对此无能为力。
运行一段时间后,我的应用程序基本处于这种状态:
如果我手动重新启动路由,线程将终止并释放内存。
我的路由配置是否有什么错误导致线程无法正确终止?
我想避免每隔一段时间编写一个石英作业来重新启动路由。
编辑:我最近也从 Camel 2.24.0 更新到 Camel 3 的最新 RC,但问题仍然存在。
好吧,事实证明处于 WAIT 状态的线程应该存在,因为默认情况下 Camel 消费者的线程池大小是 10 个(事实上,我最多有 10 *我的路由数)线程。
现在,配置线程池大小是可以完成的,但它并不像看起来那么容易,因为有几种不同的方法可以做到这一点。
threadPoolSize对我来说解决这个问题的方法是在rabbitMQ URI中设置参数:
from("rabbitmq:$rabbitMQVhost?threadPoolSize=5&connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueue}
Run Code Online (Sandbox Code Playgroud)
通过这样做,在所有路由上处理多条消息后,处于 WAIT 状态的线程数如预期为 5 *我的路由数,这在我的情况下更好:我没有任何大的并发要求,但我有大量的路由,并且每条路由都有 10 个挂起的线程及其内存占用,这很快就会耗尽我的内存。
把这个留在这里,因为看起来没有太多关于这个主题的文档,我不得不为此绞尽脑汁好几天。
| 归档时间: |
|
| 查看次数: |
187 次 |
| 最近记录: |