标签: rabbitmq

masstransit在消耗时过滤接收到的消息

我有一个问题,是否可以创建过滤器以防止消耗消息?例如,消息是:

public class ProcessingTask : IProcessingTask
{
    public int Id { set; get; }
    public string ExternalId { set; get; }
    public ProcessingTaskStatus Status { set; get; }
    public string Details { set; get; }
}
Run Code Online (Sandbox Code Playgroud)

所以我需要传递信息给消费者,如果消息StatusProcessingTaskStatus.Received例如。换句话说,该消息需要被拒绝。

c# masstransit rabbitmq

1
推荐指数
1
解决办法
1164
查看次数

我的channel.basicConsume为何不等待消息

每当我启动以下代码时:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "direct_logs";
    channel.exchangeDeclare(exchangeName, "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, "red");
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException{
            String message = new String(body, "UTF-8");
            System.out.println(message);
            System.out.println("message received");
        }
    };

    channel.basicConsume(queueName, true, consumer);
Run Code Online (Sandbox Code Playgroud)

它不会像文档中所暗示的那样开始无限循环。相反,它会立即停止。我可以消耗一段时间的唯一方法是channel.basicConsume用循环替换,如下所示:

    DateTime startedAt = new DateTime();
    DateTime stopAt = startedAt.plusSeconds(60);
    long i=0;
    try {
        while (stopAt.compareTo(new …
Run Code Online (Sandbox Code Playgroud)

message-queue rabbitmq java-8

1
推荐指数
1
解决办法
2560
查看次数

在Docker映像中启用Rabbit MQ Server的日志记录

我正在使用@ https://hub.docker.com/r/_/rabbitmq/提到的步骤在docker中安装Rabbit mq服务器。

安装顺利,让我的Rabbitmq正常运行。

我找不到兔子mq日志。

如何控制和打开日志记录?

rabbitmq docker

1
推荐指数
1
解决办法
2296
查看次数

php-amqplib libarary在命令提示符下工作,但在浏览器上没有

我使用命令提示符运行时具有相同的代码.但是当我尝试使用浏览器运行它时,它会抛出错误.

我用过php-amqplib.我的代码简单地连接到RabbitMQ Queue并将消息发送到队列中.凭据是正确的,因为我能够在命令提示符下运行相同的文件.

注意:iconv_strlen():检测到338行/var/www/html/php-amqplib/vendor/symfony/polyfill-mbstring/Mbstring.php输入字符串中的非法字符

注意:iconv_strlen():检测到338行/var/www/html/php-amqplib/vendor/symfony/polyfill-mbstring/Mbstring.php输入字符串中的非法字符

注意:iconv_strlen():检测到338行/var/www/html/php-amqplib/vendor/symfony/polyfill-mbstring/Mbstring.php输入字符串中的非法字符

致命错误:未捕获PhpAmqpLib\Exception\AMQPRuntimeException:读取数据时出错.在/var/www/html/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:242收到而不是预期的7个字节堆栈跟踪:

0 /var/www/html/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(149):phpAmqpLib\Wire\IO\StreamIO-> read(7)

1 /var/www/html/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(106):PhpAmqpLib\Wire\AMQPReader-> rawread(7)

2 /var/www/html/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(508):

PhpAmqpLib \电线\ AMQPReader->读取(7)

3 /var/www/html/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(555):

PhpAmqpLib \连接\ AbstractConnection-> wait_frame(0)

4 /var/www/html/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(217):

PhpAmqpLib\Connection\AbstractConnection-> wait_channel(0,0)

5 /var/www/html/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(328):

PhpAmqpLib \频道\ AbstractChannel-> next_frame(0)

6 /var/www/html/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(205):

第242行的/var/www/html/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php中的PhpAmqpLib\Channel\AbstractChannel-> w

rabbitmq php-amqplib

1
推荐指数
1
解决办法
1886
查看次数

为什么我在安装RabbitMQ时无法找到'rabbitmq.config'文件?

我正在运行Red Hat Enterprise Linux 7.2,我已经安装了RabbitMQ并进行了systemctl status rabbitmq-server演示

●rabbitmq-server.service - LSB:启用RabbitMQ代理提供的AMQP服务Loaded:loaded(/etc/rc.d/init.d/rabbitmq-server)
Active:active(running)自2016年3月17日星期三11: 25:56 JST; 1小时14分钟前文件:man:systemd-sysv-generator(8)

但是,当我想配置它时,我找不到该文件:find / -iname "rabbitmq.config"什么都不显示.

我有一个连接到的应用程序127.0.0.1:3000,但RabbitMQ监听的默认端口是5672,所以我想添加端口3000(希望它能解决问题)

rabbitmq

1
推荐指数
1
解决办法
5756
查看次数

是否有可能从鼠兔的每个队列消费?

我正在尝试设置一个程序,它将从RabbitMQ中的每个队列中消耗,并根据某些消息运行某些脚本.不幸的是,当添加消费者时,如果它遇到单个错误(即没有找到超时或队列),则整个通道都已死亡.此外,队列来来去去,所以它必须经常刷新队列列表.这甚至可能吗?到目前为止,这是我的代码.

import pika
import requests
import sys

try:
    host = sys.argv[1]
except:
    host = "localhost"

def get_queues(host="localhost", port=15672, user="guest", passwd="guest", virtual_host=None):
    url = 'http://%s:%s/api/queues/%s' % (host, port, virtual_host or '')
    response = requests.get(url, auth=(user, passwd))
    return response.json()

queues = get_queues(host)

def get_on_message(queue):
    def on_message(channel, method_frame, header_frame, body):
        print("message from", queue)
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    return on_message

connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()

for queue in queues:
    print(channel.is_open)
    try:
        channel.basic_consume(get_on_message(queue["name"]), queue["name"])
        print("queue added",queue["name"])
    except Exception as e:
        print("queue failed",queue["name"])
        sys.exit()

try:
    channel.start_consuming()
except KeyboardInterrupt: …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq pika

1
推荐指数
1
解决办法
414
查看次数

ES,CQRS消息传递流程

我试图了解ES + CQRS,可以使用技术堆栈。根据我的理解,流程应如下。

  1. UI向控制器(HTTP适配器)发送请求
  2. 控制器通过将请求对象作为参数来调用应用程序服务。
  3. 应用程序服务根据从控制器传递的请求对象创建命令。
  4. 应用程序服务将此命令传递给消息使用者。
  5. 消息使用者将命令发布到消息代理(RabbitMQ)
  6. 两个订户将监听上面的命令a。一个订户将使用命令从eventStore生成Aggregate,并将应用命令,而不是将生成的事件存储在事件存储中。b。另一个订户将在VIEW端,这将在View数据库/缓存中填充数据。

请暗示我的理解是正确的。

architecture events rabbitmq cqrs event-sourcing

1
推荐指数
1
解决办法
195
查看次数

芹菜一个经纪人多个队列和工人

我有一个调用的python文件tasks.py,其中我定义了4个单独的任务.我想配置芹菜以使用4个队列,因为每个队列将分配不同数量的工作人员.我正在阅读我应该使用route_task属性,但我尝试了几个选项而不是成功.

我正在关注这个doc celery route_tasks docs

我的目标是运行4个工作人员,每个任务一个,不要混合不同队列中不同工作人员的任务.这是可能的?这是一个很好的方法吗?

如果我做错了什么,我很乐意改变我的代码以使其工作

到目前为止,这是我的配置

tasks.py

app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('queueA',    routing_key='tasks.task_1'),
    Queue('queueB',    routing_key='tasks.task_2'),
    Queue('queueC',    routing_key='tasks.task_3'),
    Queue('queueD',    routing_key='tasks.task_4')
)


@app.task
def task_1():
    print "Task of level 1"


@app.task
def task_2():
    print "Task of level 2"


@app.task
def task_3():
    print "Task of level 3"


@app.task
def task_4():
    print "Task of level 4"
Run Code Online (Sandbox Code Playgroud)

为每个队列运行芹菜一名工人

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq celery

1
推荐指数
1
解决办法
2091
查看次数

windows RabbitMQ在sbin文件夹中没有rabbitmqadmin

我正在windows环境中使用RabbitMQ.我们还有用于windows的rabbitmqadmin吗?

我无法在rabbitmq的sbin文件夹中找到rabbitmqadmin.

请告诉我 .

rabbitmq rabbitmqadmin

1
推荐指数
1
解决办法
1449
查看次数

我可以在浏览器中使用amqplib吗?

我是AMQP / RabbitMQ新手,也是相对的Node.js新手。我可以在客户端使用amqplib NPM库吗?

我希望能够从Angular应用将消息直接推送到RabbitMQ。我已经使用Browserify模块化了很多客户端代码。我现在开始尝试RabbitMQ,并希望通过amqp协议将消息从浏览器直接推送到基于云的队列。

我已经通过NPM安装了amqplib并编写/粘贴了以下模块:

var amqp = require('amqplib/callback_api');

var push = function(){
    console.log('This is the CORE queue.pusher push function being triggered');

    var connString = 'amqp://username:pwd@blabla.rmq.cloudamqp.com/username';

    amqp.connect(connString, function(err, conn) {

        if (err){
            console.log("core queue.pusher push error %s", err);
        }else {
            conn.createChannel(function (err, ch) {
                var q = 'FatController';
                var msg = 'Hello World!';

                ch.assertQueue(q, {durable: false});
                // Note: on Node 6 Buffer.from(msg) should be used
                ch.sendToQueue(q, new Buffer(msg));
                console.log(" [x] Sent %s", msg);
            });

            setTimeout(function () {
                conn.close();
                process.exit(0) …
Run Code Online (Sandbox Code Playgroud)

amqp rabbitmq node.js angularjs node-amqp

1
推荐指数
1
解决办法
1189
查看次数