标签: message-queue

如何在RabbitMQ服务器上设置超时检测?

我正在尝试使用这个 python绑定RabbitMQ.

我注意到的一件事是,如果我不洁净地杀死一个消费者(模仿一个崩溃的程序),服务器会认为这个消费者仍然存在很长时间.结果是每个其他消息都将被忽略.

例如,如果您杀死消费者1次并重新连接,则将忽略1/2消息.如果您杀死另一个消费者,则将忽略2/3消息.如果你杀了第3个,那么将忽略3/4个消息,依此类推.

我试过打开确认,但它似乎没有帮助.我找到的唯一解决方案是手动停止服务器并重置它.

有没有更好的办法?

如何重新创建此方案

  • 运行rabbitmq.

  • 取消归档此库.

  • 此处下载消费者和发布者.运行amqp_consumer.py两次.运行amqp_publisher.py,输入一些数据并观察它是否按预期工作.消息以循环方式接收.

  • 使用kill -9或任务管理器杀死其中一个使用者进程.

  • 现在,当您发布消息时,50%的消息将丢失.

python message-queue amqp rabbitmq

17
推荐指数
3
解决办法
1万
查看次数

针对消息传递系统中的表的队列

我一直在体验真实生产环境中消息传递系统的优点和缺点,我必须承认,每次任何其他形式的消息传递队列时,组织良好的表或表模式都会跳动,因为:

  1. 数据永久存储在表格中.我已经看到很多java(jms)应用程序在未捕获的异常或其他错误的路上丢失或消失了消息.
  2. 队列往往会填满.相反,Db存储几乎是无限的.
  3. 表格很容易访问,而您必须使用esotic工具从队列中读取.

您对每种方法有何看法?

database queue message-queue

16
推荐指数
4
解决办法
7416
查看次数

消息队列通过CRON与DB表队列相对应

我们有一个很大的项目即将推出,有很多媒体处理(图像,视频)以及电子邮件输出等,通常我们将这些东西放入一个名为"email_queue"的表中,我们使用cron来运行脚本处理表中的队列.

我已经在像Beanstalkd这样的Message Queue系统上阅读了很多内容,甚至还设置了它.这很容易使用,问题是我不确定我是否遗漏了一些东西.

有人可以详细说明使用队列系统而不是表和CRON的好处吗?因为我真的看不到它们是什么.

谢谢

message-queue beanstalkd starling-server

16
推荐指数
2
解决办法
6203
查看次数

是否有一个快速的内存中队列,我可以使用交换项目,因为它达到一定的大小?

我一直在使用c/c ++/cuda不到一个星期,不熟悉库中所有可用的选项(对不起,如果我的问题太古怪或不可能).这是我的问题,我有一个过程,它接受数据并分析它然后做三件事中的一件,(1)保存结果,(2)丢弃结果或(3)打破数据并将其发送回处理.

通常选项(3)会创建大量数据,而且我很快就超出了我可用的内存(我的服务器是16演出),所以我解决这个问题的方法就是设置一个队列服务器(rabbitmq)来发送和接收工作from(它在达到一定大小的内存时交换队列).当我使用具有更快nics的小型服务器来传输数据时,这非常有效,但是最近我一直在学习并将我的代码从Java转换为c/c ++并在GPU上运行它,这使得队列成为一个很大的瓶颈.瓶颈显然是网络io(在廉价系统上进行分析表明cpu使用率很高,旧的gpu类似,但新的更快的cpus/gpus没有得到太多利用,网络IO稳定在300-400/mbs).所以我决定尝试完全消除网络并在服务器上本地运行队列服务器,这使得它更快但我怀疑如果我使用不依赖于外部网络服务的解决方案,它可能会更快(即使我我在本地运行它们.它可能不起作用但我想试验.

所以我的问题是,有没有什么我可以像队列那样使用我可以在读取条目时删除条目但是一旦达到一定大小就将队列交换到磁盘(但保持内存中队列总是满的,所以我不喜欢不必等待从磁盘读取?在了解Cuda时,有许多研究人员对大型数据集运行分析的例子,以及如何保持数据以最快的速度进入系统的任何想法(我想他们不受磁盘/网络的限制,否则更快的gpu的不会真的给他们增加性能的幅度)?

有这样的事吗?

ps如果它有帮助,到目前为止我已经尝试过rabbitmq(对我的情况来说太慢了),apollo mq(很好但是仍然基于网络),reddis(非常喜欢它但不能超过物理内存),玩mmap()和我我还压缩了我的数据以获得更好的吞吐量.我知道一般的解决方案,但我想知道是否有c/c ++,cuda或我可以使用的库本机(理想情况下,我会在Cuda全局内存中有一个队列交换到交换到磁盘的主机内存,所以GPU总是处于全速状态,但这可能是一厢情愿的想法).如果您还有其他任何想法可以让我知道,我会喜欢尝试它(如果它有帮助,我在Mac上开发并在Linux上运行).

c c++ io cuda message-queue

16
推荐指数
1
解决办法
926
查看次数

RabbitMQ:交换,队列和绑定 - 谁设置了什么?

使用RabbitMQ发送消息时,您基本上有交换,队列和绑定.我理解他们的想法以及他们如何相互联系,但我不确定是谁设置了什么.

基本上,我的应用程序中有三个场景.

场景1:一个发布者,几个工作进程

我想要实现的是一个将消息发送到队列的组件,并且应该有几个处理该队列中的项的工作进程.这对我来说似乎很容易.设置如下:

  • 交换:1个交换类型'直接'
  • 队列:1个队列
  • 绑定:队列绑定到交换

每当将消息发送到交换机时,它就会被传递到队列,并且工作进程会完成其任务.

一切都应该是耐用的.

那么谁设置了什么?在我看来:

  • 制片人创造交流
  • 生产者创建队列(因为当前可能没有工作进程在运行,如果没有队列,则消息将丢失)
  • 生产者将队列绑定到交换
  • 消费者只需听队列

对?

场景2:一个发布者,多个订阅者,易失性消息

第二种情况完全不同.基本上,它是一个发布/子方案,其中每条消息都发送到每个当前正在侦听的客户端.如果客户端脱机,它不再接收消息,并且不会存储在任何地方.这意味着以下设置:

  • 交换:1个交换类型'扇出'
  • 队列:n个队列,每个消费者一个队列
  • 绑定:每个队列都需要绑定到交换机

那么谁设置了什么?在我看来:

  • 制片人创造交流
  • 消费者创建队列(因为它是自己的队列,生产者无法知道谁对消息感兴趣)
  • 消费者为其队列创建绑定到交换
  • 消费者监听其队列

对?

场景3:一个发布者,多个订阅者,持久消息

基本上与方案2相同,但如果消费者离线,则不应丢失消息.在我看来,这不应该改变任何东西 - 对吗?

message-queue publish-subscribe rabbitmq

16
推荐指数
1
解决办法
3540
查看次数

AWS SNS中的延迟和吞吐量是否足以取代pub/sub的专用MQ?

为了HA,我正在考虑从自托管解决方案(ZeroMQ)切换到应用程序中pub/sub的AWS Simple Notification Service.这是应用程序的后端,因此应该是合理的实时.

什么是我可以期待SNS的延迟和吞吐量?

message-queue publish-subscribe zeromq amazon-web-services

16
推荐指数
1
解决办法
6515
查看次数

在Kafka阅读消息时重新平衡问题

我正在尝试阅读有关Kafka主题的消息,但我无法阅读它.一段时间后该进程被杀死,无需读取任何消息.

这是我得到的重新平衡错误:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages
Run Code Online (Sandbox Code Playgroud)

我试图跑ConsumerOffsetChecker,这是我得到的错误.我不知道如何解决这个问题.任何人,任何想法?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89) …
Run Code Online (Sandbox Code Playgroud)

message-queue flume apache-kafka apache-zookeeper

16
推荐指数
1
解决办法
3万
查看次数

如何使用EasyNetQ/RabbitMQ进行错误处理

我正在使用带有EasyNetQ库的C#中的RabbitMQ.我在这里使用了pub/sub模式.我仍然有一些问题希望有人可以帮助我:

  1. 当消费消息时出现错误时,它会自动移动到错误队列中.如何实现重试(以便将其放回原始队列,当它无法处理X次时,它会移动到死信队列)?
  2. 据我所知,总有1个错误队列用于转储来自所有其他队列的消息.如何为每种类型设置1个错误队列,以便每个队列都有自己的关联错误队列?
  3. 如何轻松重试错误队列中的消息?我试过Hosepipe,但它只是将消息重新发布到错误队列而不是原始队列.我不太喜欢这个选项,因为我不想在控制台中摆弄.我最好只针对错误队列进行编程.

任何人?

c# error-handling message-queue rabbitmq easynetq

16
推荐指数
3
解决办法
7634
查看次数

集中式网络日志 - 系统日志和备选方案?

在工作中,我们正在构建一个分布式应用程序(可能跨越局域网上的多台机器,可能稍后在WAN + VPN上跨越几个大陆).我们不希望每台机器都有本地日志文件(填满磁盘而无法集中查看),因此我们需要集中通过网络进行日志记录.大多数日志都不重要,因此UDP对他们来说很好,但有些是丢失重要的警报,必须可靠地传递,这意味着TCP.如果日志记录协议过于繁琐,我们担心拥塞网络,或者如果应用程序没有响应,则将应用程序拖到爬网中.

我考虑过的一些可能性是:

  • 系统日志(看起来很完美,但我的老板对此有一种敌意,所以我可能无法选择它).
  • 来自facebook的抄写员(但它似乎有点重量级,每台机器上都有一台服务器 - 并非每条日志消息都需要超级可靠性).
  • 使用像rabbitmq这样的消息队列,可以将多个队列调整到不同级别的事务安全性.
  • 最坏的情况,我可以从头开始写自己的.

你有其他建议吗?您使用了哪些集中式日志记录解决方案,以及它们的运行情况如何?

编辑:我倾向于抄写员,因为它的存储转发设计将正在运行的应用程序与网络延迟分离.但是在努力安装它之后,我发现(1)它不能作为二进制包使用 - 现在这是不可原谅的 - 而且(2)它非常依赖于一个不能作为二进制包提供的库(thrift)!最糟糕的是,它甚至无法正常编译.这不是发布质量代码,即使在开源中也是如此.

networking logging syslog message-queue

15
推荐指数
4
解决办法
1万
查看次数

消息队列系统

我正在编写消息队列系统.

我的问题是......用文件或数据库做这个队列会更好吗?

如果我选择数据库,它需要每秒检查一次新的工作,这对我来说似乎有点开销?

如果它是文件,我猜你只是不断监视文件夹并根据它执行?

BR,

c# queue message-queue

15
推荐指数
3
解决办法
1万
查看次数