我注意到的一件事是,如果我不洁净地杀死一个消费者(模仿一个崩溃的程序),服务器会认为这个消费者仍然存在很长时间.结果是每个其他消息都将被忽略.
例如,如果您杀死消费者1次并重新连接,则将忽略1/2消息.如果您杀死另一个消费者,则将忽略2/3消息.如果你杀了第3个,那么将忽略3/4个消息,依此类推.
我试过打开确认,但它似乎没有帮助.我找到的唯一解决方案是手动停止服务器并重置它.
有没有更好的办法?
如何重新创建此方案
我一直在体验真实生产环境中消息传递系统的优点和缺点,我必须承认,每次任何其他形式的消息传递队列时,组织良好的表或表模式都会跳动,因为:
您对每种方法有何看法?
我们有一个很大的项目即将推出,有很多媒体处理(图像,视频)以及电子邮件输出等,通常我们将这些东西放入一个名为"email_queue"的表中,我们使用cron来运行脚本处理表中的队列.
我已经在像Beanstalkd这样的Message Queue系统上阅读了很多内容,甚至还设置了它.这很容易使用,问题是我不确定我是否遗漏了一些东西.
有人可以详细说明使用队列系统而不是表和CRON的好处吗?因为我真的看不到它们是什么.
谢谢
我一直在使用c/c ++/cuda不到一个星期,不熟悉库中所有可用的选项(对不起,如果我的问题太古怪或不可能).这是我的问题,我有一个过程,它接受数据并分析它然后做三件事中的一件,(1)保存结果,(2)丢弃结果或(3)打破数据并将其发送回处理.
通常选项(3)会创建大量数据,而且我很快就超出了我可用的内存(我的服务器是16演出),所以我解决这个问题的方法就是设置一个队列服务器(rabbitmq)来发送和接收工作from(它在达到一定大小的内存时交换队列).当我使用具有更快nics的小型服务器来传输数据时,这非常有效,但是最近我一直在学习并将我的代码从Java转换为c/c ++并在GPU上运行它,这使得队列成为一个很大的瓶颈.瓶颈显然是网络io(在廉价系统上进行分析表明cpu使用率很高,旧的gpu类似,但新的更快的cpus/gpus没有得到太多利用,网络IO稳定在300-400/mbs).所以我决定尝试完全消除网络并在服务器上本地运行队列服务器,这使得它更快但我怀疑如果我使用不依赖于外部网络服务的解决方案,它可能会更快(即使我我在本地运行它们.它可能不起作用但我想试验.
所以我的问题是,有没有什么我可以像队列那样使用我可以在读取条目时删除条目但是一旦达到一定大小就将队列交换到磁盘(但保持内存中队列总是满的,所以我不喜欢不必等待从磁盘读取?在了解Cuda时,有许多研究人员对大型数据集运行分析的例子,以及如何保持数据以最快的速度进入系统的任何想法(我想他们不受磁盘/网络的限制,否则更快的gpu的不会真的给他们增加性能的幅度)?
有这样的事吗?
ps如果它有帮助,到目前为止我已经尝试过rabbitmq(对我的情况来说太慢了),apollo mq(很好但是仍然基于网络),reddis(非常喜欢它但不能超过物理内存),玩mmap()和我我还压缩了我的数据以获得更好的吞吐量.我知道一般的解决方案,但我想知道是否有c/c ++,cuda或我可以使用的库本机(理想情况下,我会在Cuda全局内存中有一个队列交换到交换到磁盘的主机内存,所以GPU总是处于全速状态,但这可能是一厢情愿的想法).如果您还有其他任何想法可以让我知道,我会喜欢尝试它(如果它有帮助,我在Mac上开发并在Linux上运行).
使用RabbitMQ发送消息时,您基本上有交换,队列和绑定.我理解他们的想法以及他们如何相互联系,但我不确定是谁设置了什么.
基本上,我的应用程序中有三个场景.
我想要实现的是一个将消息发送到队列的组件,并且应该有几个处理该队列中的项的工作进程.这对我来说似乎很容易.设置如下:
每当将消息发送到交换机时,它就会被传递到队列,并且工作进程会完成其任务.
一切都应该是耐用的.
那么谁设置了什么?在我看来:
对?
第二种情况完全不同.基本上,它是一个发布/子方案,其中每条消息都发送到每个当前正在侦听的客户端.如果客户端脱机,它不再接收消息,并且不会存储在任何地方.这意味着以下设置:
那么谁设置了什么?在我看来:
对?
基本上与方案2相同,但如果消费者离线,则不应丢失消息.在我看来,这不应该改变任何东西 - 对吗?
为了HA,我正在考虑从自托管解决方案(ZeroMQ)切换到应用程序中pub/sub的AWS Simple Notification Service.这是应用程序的后端,因此应该是合理的实时.
什么是我可以期待SNS的延迟和吞吐量?
我正在尝试阅读有关Kafka主题的消息,但我无法阅读它.一段时间后该进程被杀死,无需读取任何消息.
这是我得到的重新平衡错误:
[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer: (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages
Run Code Online (Sandbox Code Playgroud)
我试图跑ConsumerOffsetChecker,这是我得到的错误.我不知道如何解决这个问题.任何人,任何想法?
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group topic_group
Group Topic Pid Offset logSize Lag Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89) …Run Code Online (Sandbox Code Playgroud) 我正在使用带有EasyNetQ库的C#中的RabbitMQ.我在这里使用了pub/sub模式.我仍然有一些问题希望有人可以帮助我:
任何人?
在工作中,我们正在构建一个分布式应用程序(可能跨越局域网上的多台机器,可能稍后在WAN + VPN上跨越几个大陆).我们不希望每台机器都有本地日志文件(填满磁盘而无法集中查看),因此我们需要集中通过网络进行日志记录.大多数日志都不重要,因此UDP对他们来说很好,但有些是丢失重要的警报,必须可靠地传递,这意味着TCP.如果日志记录协议过于繁琐,我们担心拥塞网络,或者如果应用程序没有响应,则将应用程序拖到爬网中.
我考虑过的一些可能性是:
你有其他建议吗?您使用了哪些集中式日志记录解决方案,以及它们的运行情况如何?
编辑:我倾向于抄写员,因为它的存储转发设计将正在运行的应用程序与网络延迟分离.但是在努力安装它之后,我发现(1)它不能作为二进制包使用 - 现在这是不可原谅的 - 而且(2)它非常依赖于一个不能作为二进制包提供的库(thrift)!最糟糕的是,它甚至无法正常编译.这不是发布质量代码,即使在开源中也是如此.
我正在编写消息队列系统.
我的问题是......用文件或数据库做这个队列会更好吗?
如果我选择数据库,它需要每秒检查一次新的工作,这对我来说似乎有点开销?
如果它是文件,我猜你只是不断监视文件夹并根据它执行?
BR,
message-queue ×10
rabbitmq ×3
c# ×2
queue ×2
amqp ×1
apache-kafka ×1
beanstalkd ×1
c ×1
c++ ×1
cuda ×1
database ×1
easynetq ×1
flume ×1
io ×1
logging ×1
networking ×1
python ×1
syslog ×1
zeromq ×1