标签: message-queue

ZeroMQ工作人员应该如何安全"挂断"?

我本周开始使用ZeroMQ,当使用请求 - 响应模式时,我不确定如何让工作人员安全地"挂断"并关闭他的套接字而不可能丢弃消息并导致发送该消息的客户永远不会得到响应.想象一下用Python编写的工人看起来像这样:

import zmq
c = zmq.Context()
s = c.socket(zmq.REP)
s.connect('tcp://127.0.0.1:9999')
while i in range(8):
    s.recv()
    s.send('reply')
s.close()
Run Code Online (Sandbox Code Playgroud)

我一直在做实验,并且发现一个127.0.0.1:9999套接字类型的客户zmq.REQ做出一个公平排队的请求可能会让公司排队算法在工人完成最后一次工作之后send()但在工作之前就选择上面的工作人员.以下close()方法.在这种情况下,似乎请求由工作进程中的ØMQ堆栈接收和缓冲,并且当close()抛出与套接字关联的所有内容时请求将丢失.

工人如何"安全"分离 - 有没有办法发出"我不再需要消息"的信号,然后(a)循环传输信号期间到达的任何最终消息,(b)生成他们的回复,然后(c)执行close()保证不丢弃任何消息?

编辑:我想我想要输入的原始状态是"半封闭"状态,没有进一步的请求可以接收 - 并且发送者会知道 - 但返回路径仍然打开,以便我可以检查我的传入缓冲区是一个最后到达的消息,如果有一个坐在缓冲区中则响应它.

编辑:在回答一个好问题时,更正了描述以使等待消息的数量为多个,因为可能有许多连接在等待回复.

python concurrency rpc message-queue zeromq

20
推荐指数
1
解决办法
7724
查看次数

JMS性能

从性能角度理解JMS我遇到了一些麻烦.我们在应用程序中有这么简单的代码:

QueueConnection connection = null;
QueueSession session = null;
QueueSender sender = null;
TextMessage msg = null;

try {
  // The JNDIHelper uses InitialContext to look up things
  QueueConnectionFactory qcf = JNDIHelper.lookupFactory();
  Queue destQueue = JNDIHelper.lookupQueue();

  // These objects are created for every message, which is quite slow
  connection = qcf.createQueueConnection();
  session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
  sender = session.createSender(destQueue);

  // This is the actual message
  msg = session.createTextMessage(xmlMsg);
  sender.setTimeToLive(0);
  sender.send(msg);
} 
finally {

  // Close all objects again
  JMSUtilities.safeClose(sender);
  JMSUtilities.safeClose(session);
  JMSUtilities.safeClose(connection); …
Run Code Online (Sandbox Code Playgroud)

java weblogic jms message-queue

20
推荐指数
2
解决办法
8305
查看次数

当我使用pika(python)向RabbitMQ尝试ack消息时出现错误"未知传递标记"

我想在几个线程中使用进程消息但是在执行此代码时我遇到错误:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, …
Run Code Online (Sandbox Code Playgroud)

python message-queue rabbitmq pika

20
推荐指数
3
解决办法
3万
查看次数

了解zeromq java绑定

我正在调查zeromq作为java项目中的消息传递解决方案,但我发现有关java绑定的说明有点难以理解. http://www.zeromq.org/bindings:java

我不熟悉java绑定,所以这些可能是愚蠢的问题,但有人可以帮助我理解:

  1. 为什么我需要安装任何东西?
  2. 我在一台机器上构建的罐子可以在另一个系统上运行吗?我需要这个应用程序是可移植的.
  3. 如果是这样,为什么我要开始建立自己的罐子?

我觉得zeromq上提供的说明需要熟悉构建我缺乏的C项目,所以也许我只是在密集,但这似乎很多工作.

java message-queue zeromq

20
推荐指数
1
解决办法
8043
查看次数

SQL Server Service Broker的缺点

我一直在研究SQL Server Service Broker的范围来替换当前的消息传递解决方案MSMQ.我想知道SQL Server Service Broker的缺点与MSMQ相比,以下标准.

  1. 发展
  2. 故障排除
  3. 性能(假设我们需要每天处理100,000条消息,平均大小约为25 KB)
  4. 可扩展性

sql-server msmq message-queue service-broker

20
推荐指数
1
解决办法
3万
查看次数

Rabbitmq-设计消息重播服务

我正在尝试设计一种重播机制,使用户能够重放队列中的消息.我为包含多个队列和多个消费者的交换而设计的最佳设计是:

  1. 创建一个记录器服务,它将:

    • 创建一个队列并将所有路由键绑定到该队列.
    • 消费来自交易所的所有消息.
    • 将所有消息保存到数据库.
  2. 订阅者请求重播.

    • 每个订阅者创建一个新的交换,队列并使用与其常规队列相同的绑定绑定到它.
    • 订阅者向Web服务器发送休息请求以开始使用过滤器重播(startdate等).请求包含其重播交换名称.
    • Web服务器从数据库中提取数据并将其发布到特定的交换机
    • 可以添加细化,例如附加RequestId并回显它.

在此输入图像描述

问题:
1.这有意义吗?
我发明了轮子吗?有兔子固有的解决方案吗?插入?
3.创建多个交易所是否被视为良好做法?
在此解决方案中,创建每个队列的交换以便发布相同的消息.

另一种解决方案:
1.为每个队列创建一个额外的队列"ReplayQueue".设置一个TTL(假设一个月).
2.每次用户请求重播时,让他从自己的ReplayQueue重放,而不需要进行重放.

这个解决方案有点问题,因为.

  • 为了重播最后一天,消费者必须提前29天取出并过滤掉它们.
  • 此解决方案可以扩展 - 队列将变得更大(与可以扩展的数据库存储不同).

message-queue rabbitmq replay

20
推荐指数
1
解决办法
5349
查看次数

我应该将Celery或Carrot用于Django项目吗?

关于我应该使用哪一个,我有点困惑.我认为要么会有效,但是哪一个比另一个更好或更合适?

http://github.com/ask/carrot/tree/master

http://github.com/ask/celery/tree/master

python django message-queue amqp rabbitmq

19
推荐指数
2
解决办法
7029
查看次数

你/你应该在.NET应用程序中使用SQL Server Service Broker吗?

我在数据库中有许多需要触发应用程序代码的操作.目前我正在使用数据库轮询,但我听说SQL Server Service Broker可以提供类似MSMQ的功能.

  1. 我可以从在不同计算机上运行的.NET应用程序中侦听SQL Server Service Broker队列吗?
  2. 如果是的话,我应该这样做吗?
  3. 如果没有,你会推荐什么?

.net sql-server message-queue service-broker

19
推荐指数
2
解决办法
2万
查看次数

消息队列系统中的消息优先级本质上是不重要的吗?

似乎我所看到的大多数消息传递系统都具有对优先级消息队列的基本支持(如果有的话).例如,AMQP仅指定最少2个优先级.RabbitQQ是AMQP实现,不支持任何优先级.ActiveMQ将在几天内获得5.4版中10个消息优先级的支持.10个优先级是JMS规范指定的.

一个优先级队列基于与优先级的约束范围中的任意字段中字的非消息感命令其内容.为什么这样的实现不作为消息传递系统的一部分存在?正如我在标题中所说,优先考虑的是一种固有的非消息传递概念?

我意识到一个答案可能是优先级的概念引入了消息在队列中无限萎缩的可能性,同时处理了更高优先级的消息.还有其他原因吗?

messaging activemq-classic message-queue amqp rabbitmq

18
推荐指数
2
解决办法
2792
查看次数

获取RabbitMQ队列中的消息数

我们使用amqplib来发布/使用消息.我希望能够读取队列中的消息数(理想情况下都是已确认和未确认).这将允许我向管理员用户显示一个很好的状态图,并检测某个组件是否跟不上负载.

我在amqplib文档中找不到有关读取队列状态的任何信息.

有人能指出我正确的方向吗?

python message-queue rabbitmq py-amqplib

18
推荐指数
2
解决办法
2万
查看次数