我本周开始使用ZeroMQ,当使用请求 - 响应模式时,我不确定如何让工作人员安全地"挂断"并关闭他的套接字而不可能丢弃消息并导致发送该消息的客户永远不会得到响应.想象一下用Python编写的工人看起来像这样:
import zmq
c = zmq.Context()
s = c.socket(zmq.REP)
s.connect('tcp://127.0.0.1:9999')
while i in range(8):
s.recv()
s.send('reply')
s.close()
Run Code Online (Sandbox Code Playgroud)
我一直在做实验,并且发现一个127.0.0.1:9999套接字类型的客户zmq.REQ做出一个公平排队的请求可能会让公司排队算法在工人完成最后一次工作之后send()但在工作之前就选择上面的工作人员.以下close()方法.在这种情况下,似乎请求由工作进程中的ØMQ堆栈接收和缓冲,并且当close()抛出与套接字关联的所有内容时请求将丢失.
工人如何"安全"分离 - 有没有办法发出"我不再需要消息"的信号,然后(a)循环传输信号期间到达的任何最终消息,(b)生成他们的回复,然后(c)执行close()保证不丢弃任何消息?
编辑:我想我想要输入的原始状态是"半封闭"状态,没有进一步的请求可以接收 - 并且发送者会知道 - 但返回路径仍然打开,以便我可以检查我的传入缓冲区是一个最后到达的消息,如果有一个坐在缓冲区中则响应它.
编辑:在回答一个好问题时,更正了描述以使等待消息的数量为多个,因为可能有许多连接在等待回复.
从性能角度理解JMS我遇到了一些麻烦.我们在应用程序中有这么简单的代码:
QueueConnection connection = null;
QueueSession session = null;
QueueSender sender = null;
TextMessage msg = null;
try {
// The JNDIHelper uses InitialContext to look up things
QueueConnectionFactory qcf = JNDIHelper.lookupFactory();
Queue destQueue = JNDIHelper.lookupQueue();
// These objects are created for every message, which is quite slow
connection = qcf.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
sender = session.createSender(destQueue);
// This is the actual message
msg = session.createTextMessage(xmlMsg);
sender.setTimeToLive(0);
sender.send(msg);
}
finally {
// Close all objects again
JMSUtilities.safeClose(sender);
JMSUtilities.safeClose(session);
JMSUtilities.safeClose(connection); …Run Code Online (Sandbox Code Playgroud) 我想在几个线程中使用进程消息但是在执行此代码时我遇到错误:
from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback
def doWork(body, args, channel):
r = random.random()
time.sleep(r * 10)
try:
channel.basic_ack(delivery_tag=args.delivery_tag)
except :
traceback.print_exc()
auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()
while True:
time.sleep(0.03)
try:
method_frame, header_frame, body = channel.basic_get(queue="test_queue")
if method_frame.NAME == 'Basic.GetEmpty':
continue
t = threading.Thread(target=doWork, …Run Code Online (Sandbox Code Playgroud) 我正在调查zeromq作为java项目中的消息传递解决方案,但我发现有关java绑定的说明有点难以理解. http://www.zeromq.org/bindings:java
我不熟悉java绑定,所以这些可能是愚蠢的问题,但有人可以帮助我理解:
我觉得zeromq上提供的说明需要熟悉构建我缺乏的C项目,所以也许我只是在密集,但这似乎很多工作.
我一直在研究SQL Server Service Broker的范围来替换当前的消息传递解决方案MSMQ.我想知道SQL Server Service Broker的缺点与MSMQ相比,以下标准.
我正在尝试设计一种重播机制,使用户能够重放队列中的消息.我为包含多个队列和多个消费者的交换而设计的最佳设计是:
创建一个记录器服务,它将:
订阅者请求重播.

问题:
1.这有意义吗?
我发明了轮子吗?有兔子固有的解决方案吗?插入?
3.创建多个交易所是否被视为良好做法?
在此解决方案中,创建每个队列的交换以便发布相同的消息.
另一种解决方案:
1.为每个队列创建一个额外的队列"ReplayQueue".设置一个TTL(假设一个月).
2.每次用户请求重播时,让他从自己的ReplayQueue重放,而不需要进行重放.
这个解决方案有点问题,因为.
关于我应该使用哪一个,我有点困惑.我认为要么会有效,但是哪一个比另一个更好或更合适?
我在数据库中有许多需要触发应用程序代码的操作.目前我正在使用数据库轮询,但我听说SQL Server Service Broker可以提供类似MSMQ的功能.
我们使用amqplib来发布/使用消息.我希望能够读取队列中的消息数(理想情况下都是已确认和未确认).这将允许我向管理员用户显示一个很好的状态图,并检测某个组件是否跟不上负载.
我在amqplib文档中找不到有关读取队列状态的任何信息.
有人能指出我正确的方向吗?
message-queue ×10
rabbitmq ×5
python ×4
amqp ×2
java ×2
sql-server ×2
zeromq ×2
.net ×1
concurrency ×1
django ×1
jms ×1
messaging ×1
msmq ×1
pika ×1
py-amqplib ×1
replay ×1
rpc ×1
weblogic ×1