在ActiveMQ中是否可以限制开放消费者交易所需的内存?

Mar*_*ano 5 java activemq-classic

我目前正在使用ActiveMQ 5.7.0和KahaDB,但如果需要可以升级.我使用嵌入式代理,并且有经验创建插件并以编程方式控制代理配置.

在我的应用程序中,我正在交易会话中创建一个消费者.消费者正在将数据传输到另一个服务(而不是ActiveMQ).这项服务允许我提交工作,但这些提交可能很昂贵.我发现会话消耗的未提交消息保存在代理的内存中.这要求我提交比我想要的更多的时间来释放代理内存资源.理想情况下,我想控制何时发生提交而不必考虑ActiveMQ内存利用率.

我目前的算法是:

  • 收到消息
  • 翻译消息并清除正文以节省消费者空间(msg.clearBody())
  • 发送消息到服务
  • 定期检查队列内存利用率
  • 如果代理内存严重,则提交

我希望能做的是:

  • 收到消息
  • 翻译消息并清除正文以节省消费者空间(msg.clearBody())
  • 发送消息到服务
  • 定期检查队列磁盘利用率
  • 如果代理磁盘利用率严重,则提交

我已经看过基于文件的光标,看起来它可以做我需要的东西,但是当我尝试使用它时,它似乎没有达到预期的效果.一个类似的问题也被要求在对ActiveMQ的用户讨论组过去.

更新
澄清.我们的问题不在于打开事务中的消息数量,而在于消息的大小.我们的应用程序经常需要处理大量消息(> 50MB).除了这个问题,ActiveMQ与这种大小的消息一起工作得很好.我们正在寻找的是一种msg.clearBody()在内存资源耗尽时触发代理的方法.然后,如果再次需要消息内容,则代理可以从磁盘支持的存储重新加载它们.我们愿意开发插件或扩展来实现这一目标.

ash*_*ash 4

正如我在 ActiveMQ 留言板上所述... http://activemq.2283324.n4.nabble.com/Transactions-and-memory-conspiration-td4224862.html

当事务正在进行时,ActiveMQ 无法从内存中清除这些消息。事务的数据结构保存消息引用。

一旦进入队列的内存中,消息只能通过以下方式之一提供给 JVM GC:

  • 确认消息
  • 消息过期
  • 完成消费消息的事务
  • 用于清除队列的管理工具

我对该用例非常好奇,因为如果 JVM 堆大小合适,代理可以在内存中保存大量消息。看到了什么样的性能数字以及通过增加数字预计会有多少改进?应该可以通过定期添加 Session.commit() 调用进行测试来衡量整体改进,或者对于 XA 事务,通过使用非事务队列消耗来衡量整体改进。

如果提交下游存在很长的延迟,也许可以使用并行处理来提高吞吐量的策略:

  • 填写交易
  • 开始提交
  • 同时,继续将传入的消息消费到下一个事务中
  • 填写第二笔交易
  • 等待第一个事务提交完成
  • 如果第一个事务失败,则回滚第二个事务
  • 如果第一个事务成功,则提交第二个事务并启动相同事务的下一个迭代

当然,从长远来看,如果下游系统跟不上传入消息的速度,就无法防止缓慢消耗问题导致代理过载。