我正在寻求解决消息传递服务器和队列的 FIFO 性质所遇到的问题。在某些情况下,我希望根据消息传递顺序以外的标准将队列中的消息分发到消费者池。理想情况下,这将防止用户占用系统中的共享资源。以这个过于简化的场景为例:
鉴于每个用户的垃圾箱中可能有大量消息,我们有哪些选项可以允许并发处理每个垃圾箱而不考虑排队时间?在我看来,有几个明显的解决方案:
在我们的例子中,创建一个单独的队列并管理每个用户的消费者确实不切实际。这是可以做到的,但我认为如果合理的话我真的更喜欢第二种选择。我们正在使用 RabbitMQ,但如果有更适合此任务的技术,则不一定与其绑定。
我正在考虑使用 Rabbit 的消息优先级来帮助随机发送的想法。通过随机为消息分配 1 到 10 之间的优先级,这应该有助于分发消息。这种方法的问题是,如果队列永远不会完全清空,那么具有最低优先级的消息可能会永远卡在队列中。我以为我可以在消息上使用 TTL,然后以升级的优先级重新排队消息,但我在文档中注意到了这一点:
应过期的消息仍然只会从队列头部过期。这意味着与普通队列不同,即使每个队列的 TTL 也可能导致过期的低优先级消息卡在未过期的高优先级消息后面。这些消息永远不会被传递,但它们会出现在队列统计信息中。
我担心我可能会因为这种方法而陷入兔子洞。我想知道其他人是如何解决这个问题的。任何有关创意路由、消息传递模式或任何替代解决方案的反馈将不胜感激。
我正在设计一个系统,它将使用 RabbitMQ 在应用程序之间进行请求/响应。
我习惯于使用 REST API,并且从这个背景出发,我一直在思考如何在执行请求/响应时构造消息。
我需要构建它来处理几种情况:
我计划将有效负载 JSON 格式化。我正在考虑使用某种类似于 HTTP 的响应代码(也许使用相同的代码?)并将响应代码设置为消息上的属性/标头。
对于获取/查询,我的想法是在有效负载对象中拥有一个查询属性。
但这让我想到,我可能认为这太像 REST API,并且可能有一些更好、更成熟的方法来做到这一点。
在进行设置时,我一直在阅读《RabbitMQ in Action》一书,但我没有看到其中提到这一点。我的 google-fu 也让我失败了,没有提供任何结果。
有经验的人愿意分享他们如何构建信息吗?
这是我的配置:
@Bean
ActiveMQConnectionFactory activeMQConnectionFactory() {
String url = this.environment.getProperty("jms.broker.url");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(url);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(5);
return redeliveryPolicy;
}
.....
Run Code Online (Sandbox Code Playgroud)
这是我的消费者:
@Service("msgConsumer")
public class MessageConsumer {
private static final String ORDER_RESPONSE_QUEUE = "thequeue.Q";
@JmsListener(destination = ORDER_RESPONSE_QUEUE, containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(final Message<String> message) throws Exception {
MessageHeaders headers = message.getHeaders();
LOG.info("Application : headers received : {}", headers);
String response = message.getPayload();
LOG.info("Application …Run Code Online (Sandbox Code Playgroud) 我有一个装饰有的芹菜任务,autoretry_for以便在发生已知异常时,它将重试该任务。这里有一个虚拟版本:
class ExpectedException(Exception):
pass
@app.task(autoretry_for=(ExpectedException,), retry_kwargs={'max_retries': 2, 'countdown': 1})
def decorated_autoretry():
logging.info(
"Attempt: {attempt} of {attempts}".format(
attempt=decorated_autoretry.request.retries, attempts=decorated_autoretry.max_retries
)
)
raise ExpectedException
Run Code Online (Sandbox Code Playgroud)
当运行给出以下输出时:
[2018-01-30 12:17:31,899: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a]
[2018-01-30 12:17:31,900: INFO/ForkPoolWorker-1] Attempt: 1 of 3
[2018-01-30 12:17:31,915: INFO/ForkPoolWorker-1] Task interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a] retry: Retry in 1s: ExpectedException()
[2018-01-30 12:17:31,915: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a] ETA:[2018-01-30 12:17:32.901955+00:00]
[2018-01-30 12:17:33,024: INFO/ForkPoolWorker-2] Attempt: 2 of 3
[2018-01-30 12:17:33,072: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a] ETA:[2018-01-30 12:17:34.029462+00:00]
[2018-01-30 12:17:33,072: INFO/ForkPoolWorker-2] Task interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a] retry: Retry in 1s: …Run Code Online (Sandbox Code Playgroud) 我对不同的消息代理感到困惑。
我的设备正在使用 MQTT。到目前为止,我已经研究过 HiveMQ、IBM Messagesight、RabbitMQ、google pub and sub、AWS SQS。
HiveMQ(MQTT消息代理)和RabbitMQ(或pub and sub,SQS)有什么区别?
除了协议和成本可能不同之外,它们的功能有什么区别吗?还有一个问题,IBM messagesight 是一种消息代理吗?它与 google pub and sub 或rabbit 有什么不同吗?
我找不到任何有关 messagesight 的信息。
根据此文档,服务总线支持两种模式:接收并删除和窥视锁定。
如果使用 Peek-Lock 模式,如果消费者在处理消息后立即崩溃/挂起/执行很长的 GC,但在 messageId 为“已完成”并且可见性时间到期之前,有可能会两次传递同一条消息。
那么微软是怎么说Service Bus支持最多一次交付模式的呢?是因为接收并删除模式只发送一次消息吗?但是话又说回来,如果消费者在处理消息时发生了某些事情,那么有价值的信息就会丢失。
如果是,那么使用 Azure 服务总线作为队列和 Azure 函数作为消费者来确保精确一次交付的最佳方法是什么。
PS 我能想到的一种方法是将 MessageID 存储在 blob 中,但在我的情况下,MessageID 的数量可能非常大,存储和加载所有消息并不是正确的方法。
我们正在开发一个系统,该系统使用rabbitMQ 在其客户端和服务器之间发送和接收数据。Internet 连接有时可能会丢失。
1- 队列中的所有消息都可以导出到一个文件吗?并以某种方式使用此文件导入到客户端?
2- 在另一种情况下,客户端想要向队列发送一些消息,但它没有互联网连接!所以我们想从客户端导出所有消息并制作一个文件并以某种方式将其发送到服务器(例如,将其传输到另一个有互联网的位置),是否可以将此文件导入队列?
我浏览了消息传递语义的 Apache Pulsar 文档。Apache 函数提到的交付语义(至少一次,最多一次和有效一次),如果我们不使用 Apache 函数,那么可用的所有不同交付语义是什么?
我正在尝试编写一个小的wsgi应用程序,它会在每次请求后将一些对象放到外部队列中.我想批量生产,即.使web服务器将对象放入内存中类似缓冲区的结构,并将另一个线程和/或进程批量发送到队列,缓冲区足够大或在某些超时后清除缓冲区.我不想参加NIH综合症,也不想打扰线程,但我找不到合适的代码来完成这项工作.有什么建议?
让Perl脚本通过数据库互相发送消息,但想知道是否有办法让它们直接相互通信 - 如果是,如何?如果重要,这两个脚本都在一台服务器上运行,一台脚本可以启动另一台服务器.
更新: 相关问题 - " ipc + perl "标记的问题.