我有一个问题,是否可以创建过滤器以防止消耗消息?例如,消息是:
public class ProcessingTask : IProcessingTask
{
public int Id { set; get; }
public string ExternalId { set; get; }
public ProcessingTaskStatus Status { set; get; }
public string Details { set; get; }
}
Run Code Online (Sandbox Code Playgroud)
所以我需要传递信息给消费者,如果消息Status是ProcessingTaskStatus.Received例如。换句话说,该消息需要被拒绝。
每当我启动以下代码时:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "direct_logs";
channel.exchangeDeclare(exchangeName, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "red");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException{
String message = new String(body, "UTF-8");
System.out.println(message);
System.out.println("message received");
}
};
channel.basicConsume(queueName, true, consumer);
Run Code Online (Sandbox Code Playgroud)
它不会像文档中所暗示的那样开始无限循环。相反,它会立即停止。我可以消耗一段时间的唯一方法是channel.basicConsume用循环替换,如下所示:
DateTime startedAt = new DateTime();
DateTime stopAt = startedAt.plusSeconds(60);
long i=0;
try {
while (stopAt.compareTo(new …Run Code Online (Sandbox Code Playgroud) 我正在使用@ https://hub.docker.com/r/_/rabbitmq/提到的步骤在docker中安装Rabbit mq服务器。
安装顺利,让我的Rabbitmq正常运行。
我找不到兔子mq日志。
如何控制和打开日志记录?
我使用命令提示符运行时具有相同的代码.但是当我尝试使用浏览器运行它时,它会抛出错误.
我用过php-amqplib.我的代码简单地连接到RabbitMQ Queue并将消息发送到队列中.凭据是正确的,因为我能够在命令提示符下运行相同的文件.
注意:iconv_strlen():检测到338行/var/www/html/php-amqplib/vendor/symfony/polyfill-mbstring/Mbstring.php输入字符串中的非法字符
注意:iconv_strlen():检测到338行/var/www/html/php-amqplib/vendor/symfony/polyfill-mbstring/Mbstring.php输入字符串中的非法字符
注意:iconv_strlen():检测到338行/var/www/html/php-amqplib/vendor/symfony/polyfill-mbstring/Mbstring.php输入字符串中的非法字符
致命错误:未捕获PhpAmqpLib\Exception\AMQPRuntimeException:读取数据时出错.在/var/www/html/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:242收到而不是预期的7个字节堆栈跟踪:
0 /var/www/html/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(149):phpAmqpLib\Wire\IO\StreamIO-> read(7)
1 /var/www/html/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(106):PhpAmqpLib\Wire\AMQPReader-> rawread(7)
2 /var/www/html/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(508):
PhpAmqpLib \电线\ AMQPReader->读取(7)
3 /var/www/html/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(555):
PhpAmqpLib \连接\ AbstractConnection-> wait_frame(0)
4 /var/www/html/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(217):
PhpAmqpLib\Connection\AbstractConnection-> wait_channel(0,0)
5 /var/www/html/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(328):
PhpAmqpLib \频道\ AbstractChannel-> next_frame(0)
6 /var/www/html/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(205):
第242行的/var/www/html/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php中的PhpAmqpLib\Channel\AbstractChannel-> w
我正在运行Red Hat Enterprise Linux 7.2,我已经安装了RabbitMQ并进行了systemctl status rabbitmq-server演示
●rabbitmq-server.service - LSB:启用RabbitMQ代理提供的AMQP服务Loaded:loaded(/etc/rc.d/init.d/rabbitmq-server)
Active:active(running)自2016年3月17日星期三11: 25:56 JST; 1小时14分钟前文件:man:systemd-sysv-generator(8)
但是,当我想配置它时,我找不到该文件:find / -iname "rabbitmq.config"什么都不显示.
我有一个连接到的应用程序127.0.0.1:3000,但RabbitMQ监听的默认端口是5672,所以我想添加端口3000(希望它能解决问题)
我正在尝试设置一个程序,它将从RabbitMQ中的每个队列中消耗,并根据某些消息运行某些脚本.不幸的是,当添加消费者时,如果它遇到单个错误(即没有找到超时或队列),则整个通道都已死亡.此外,队列来来去去,所以它必须经常刷新队列列表.这甚至可能吗?到目前为止,这是我的代码.
import pika
import requests
import sys
try:
host = sys.argv[1]
except:
host = "localhost"
def get_queues(host="localhost", port=15672, user="guest", passwd="guest", virtual_host=None):
url = 'http://%s:%s/api/queues/%s' % (host, port, virtual_host or '')
response = requests.get(url, auth=(user, passwd))
return response.json()
queues = get_queues(host)
def get_on_message(queue):
def on_message(channel, method_frame, header_frame, body):
print("message from", queue)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
return on_message
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
for queue in queues:
print(channel.is_open)
try:
channel.basic_consume(get_on_message(queue["name"]), queue["name"])
print("queue added",queue["name"])
except Exception as e:
print("queue failed",queue["name"])
sys.exit()
try:
channel.start_consuming()
except KeyboardInterrupt: …Run Code Online (Sandbox Code Playgroud) 我试图了解ES + CQRS,可以使用技术堆栈。根据我的理解,流程应如下。
请暗示我的理解是正确的。
我有一个调用的python文件tasks.py,其中我定义了4个单独的任务.我想配置芹菜以使用4个队列,因为每个队列将分配不同数量的工作人员.我正在阅读我应该使用route_task属性,但我尝试了几个选项而不是成功.
我正在关注这个doc celery route_tasks docs
我的目标是运行4个工作人员,每个任务一个,不要混合不同队列中不同工作人员的任务.这是可能的?这是一个很好的方法吗?
如果我做错了什么,我很乐意改变我的代码以使其工作
到目前为止,这是我的配置
tasks.py
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('queueA', routing_key='tasks.task_1'),
Queue('queueB', routing_key='tasks.task_2'),
Queue('queueC', routing_key='tasks.task_3'),
Queue('queueD', routing_key='tasks.task_4')
)
@app.task
def task_1():
print "Task of level 1"
@app.task
def task_2():
print "Task of level 2"
@app.task
def task_3():
print "Task of level 3"
@app.task
def task_4():
print "Task of level 4"
Run Code Online (Sandbox Code Playgroud)
为每个队列运行芹菜一名工人
celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug …Run Code Online (Sandbox Code Playgroud) 我正在windows环境中使用RabbitMQ.我们还有用于windows的rabbitmqadmin吗?
我无法在rabbitmq的sbin文件夹中找到rabbitmqadmin.
请告诉我 .
我是AMQP / RabbitMQ新手,也是相对的Node.js新手。我可以在客户端使用amqplib NPM库吗?
我希望能够从Angular应用将消息直接推送到RabbitMQ。我已经使用Browserify模块化了很多客户端代码。我现在开始尝试RabbitMQ,并希望通过amqp协议将消息从浏览器直接推送到基于云的队列。
我已经通过NPM安装了amqplib并编写/粘贴了以下模块:
var amqp = require('amqplib/callback_api');
var push = function(){
console.log('This is the CORE queue.pusher push function being triggered');
var connString = 'amqp://username:pwd@blabla.rmq.cloudamqp.com/username';
amqp.connect(connString, function(err, conn) {
if (err){
console.log("core queue.pusher push error %s", err);
}else {
conn.createChannel(function (err, ch) {
var q = 'FatController';
var msg = 'Hello World!';
ch.assertQueue(q, {durable: false});
// Note: on Node 6 Buffer.from(msg) should be used
ch.sendToQueue(q, new Buffer(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(function () {
conn.close();
process.exit(0) …Run Code Online (Sandbox Code Playgroud)