标签: message-queue

使用 JMS 关闭持久消费者的最佳方法是什么?

我在 JMS 主题上有一个持久消费者。我设置了客户端 ID,我可以看到它作为持久消费者列在队列中。

当我推出代码时,服务器会重新启动,但我想将重新启动时丢失的消息排队(因此是持久的)。干净地关闭消息使用者的正确方法是什么,这样您就可以关闭它,但仍然让队列知道为您缓冲消息

destination = session.createTopic("beacons");
messageConsumer = session.createDurableSubscriber(destination, clientID);
Run Code Online (Sandbox Code Playgroud)

这是正确的方法吗?或者这会告诉队列您不再希望在重新连接时传递消息?

messageConsumer.close 
Run Code Online (Sandbox Code Playgroud)

简而言之,我希望能够重新启动我的服务,而不会丢失我订阅的主题的消息,谢谢!

java jms message-queue

3
推荐指数
1
解决办法
1475
查看次数

在 C 中使用单个消息队列是否可以实现双向通信

我希望服务器向客户端发送一些消息,并让客户端确认它。我被分配了这个任务。我可以在 C linux 中使用单个消息队列来完成它还是我需要创建两个?

谢谢 :)

c linux ipc message-queue

3
推荐指数
1
解决办法
2504
查看次数

PHP如何在RabbitMQ中取消消费者?

我有下一个代码:

<?php

// callback function for recive the message and canceling consumer
function consumer(\AMQPEnvelope $envelope, \AMQPQueue $queue)
{
    $queue->ack($envelope->getDeliveryTag());
    $queue->cancel($envelope->getCorrelationId());
    echo "Message was recived and consumer will be canceled by consumer tag: {$envelope->getCorrelationId()}\n";
}

// generating uniqie exchange and queue
$correlationId = uniqid(str_replace('.', '', (string)microtime(TRUE)) . '_');
$queueName = "databus_response_{$correlationId}";
$consumerTag = "consumer_tag_{$correlationId}";

// establesh connection
$connection = new \AMQPConnection(array('host'=>'127.0.0.1', 'user'=>'guest', 'password'=>'guest'));
$connection->connect();
$channel = new \AMQPChannel($connection);

// declare exchange
$exchange = new \AMQPExchange($channel);
$exchange->setFlags(AMQP_AUTODELETE);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setName($queueName);
$exchange->declareExchange();

// declare …
Run Code Online (Sandbox Code Playgroud)

php message-queue command-line-interface rabbitmq

3
推荐指数
1
解决办法
3392
查看次数

为什么 errno 设置为 22:mq_open() POSIX


我在尝试使用 C 在 POSIX 中创建 message_queue 时收到 errno 22。据我所知,通过与网络上可用的示例代码进行比较,我已经正确设置了参数。

这是一个片段:

    int open_flags;
    mqd_t mqfd;
    int bytes_per_msg;
    struct mq_attr attr;
    unsigned int* msgbuff;

    printf("from 1 to 400, what is N? : ");
    scanf("%d", &n);
    bytes_per_msg = (n + 1) * (sizeof(unsigned int));
    msgbuff = (unsigned int*)malloc(bytes_per_msg);

    open_flags = O_CREAT|O_RDWR;
    attr.mq_maxmsg = n;
    attr.mq_msgsize = bytes_per_msg;
    attr.mq_flags   = 0;


    mqfd = mq_open("/myqueue", open_flags, 0666, &attr);

    if(mqfd == -1){
        printf("queue creation failed, ERRNO: %d\n",errno);
    }
Run Code Online (Sandbox Code Playgroud)

编辑:我很抱歉没有更清楚。错误 22 是无效参数。--错误编号的含义可以在errno.h中找到

c posix message-queue

3
推荐指数
1
解决办法
7271
查看次数

mq_open 错误号 13 权限被拒绝

我在尝试使用 mq_open() 调用创建 posix mq 时遇到了权限问题。我确实合并了此处提到的更改mq_open Permission denied我查看了其他相关帖子,例如https://groups.google.com/forum/#!topic/comp.unix.programmer/hnTZf6aPpbE但这也指向同一件事。

此外,在尝试编译时,我遇到了未识别 mq 调用的错误,并且在线显示通过在 gcc 中添加 -lrt 进行编译,post 能够编译,提到它,因为我不完全了解它的基本原理并且没有通过阅读帖子来理解它:)

gcc server_mq.c -lrt -o 服务器

错误编号为 13

哦,天哪,mqd 出了点问题!没有权限

#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <errno.h>
#include <string.h>

#include "client_server.h"

#define PATH "/tmp/servermq"

int main(void)
{
    mqd_t mqd;
    mode_t omask;
    omask = umask(0);
    int flags = O_RDWR | O_CREAT | O_EXCL;
    struct mq_attr attr, *attrp;

    attr.mq_maxmsg = 5;
    attr.mq_msgsize = 1024;

    attrp = &attr;

    mqd = …
Run Code Online (Sandbox Code Playgroud)

c linux posix message-queue

3
推荐指数
1
解决办法
4294
查看次数

JMS 持久性和持久性

我目前正在阅读JMS一些文章,在阅读了几篇文章之后,我对使用非持久消息的持久订阅有点困惑。让我们从:http : //www2.sys-con.com/itsg/virtualcd/java/archives/0604/chappell/index.html - 它说:

  • 如果非持久消息是为断开连接的持久订阅者准备的,消息服务器会将消息保存到磁盘,就好像它是持久消息一样。在这种情况下,持久消息和非持久消息之间的区别很微妙,但非常重要。对于非持久消息,JMS 提供程序可能会在它有机会代表断开连接的持久订阅者将消息写出到磁盘之前失败。消息可能会丢失

另一个来源:http : //openmessaging.blogspot.com/2009/04/durable-messages-and-persistent.html说的完全不同:

  • 如果此主题上有任何持久订阅,则会将消息的副本发送给那些处于活动状态的持久订阅者。对于那些非活动的持久订阅,消息的副本保存在内存中,并在它们下次变为活动状态时发送给它们。

    如果代理重新启动,此保存的消息将丢失。由于非持久消息未保存在磁盘上,因此代理重启意味着任何尚未收到消息的非活动持久订阅都将错过消息。

那么,真相是什么:)?

java jms message-queue

3
推荐指数
1
解决办法
1640
查看次数

如何中止公牛队列中的特定任务?

根据这个包https://github.com/OptimalBits/bull是否有可能中止“等待队列”中的某个任务?我的用例如下:

我有一个 mongodb 集合“用户”和一个集合“友谊”,我在其中存储了两个用户的姓名和头像。所以我只需要一个查询来获取某个用户的好友列表。当用户更改他的头像时,我必须更新“friendship”集合中该用户内的所有文档。这是一个性能不重要的操作,因为我希望它在后台执行,并且一致性对于这个用例并不重要。但是当用户在短时间内多次更新他的头像时,我想取消所有引用旧任务(用于更新友谊集合),除了最新的。这与公牛可能吗?

提前致谢,我将不胜感激有关此的所有信息。

priority-queue message-queue job-queue mongodb node.js

3
推荐指数
1
解决办法
4127
查看次数

是否可以使用 AVRO 为 Kafka 中的 Google Pub/Sub 主题定义模式?

据我所知,我们可以在 Kafka 上定义 AVRO 模式,使用该模式定义的主题将只接受与该模式匹配的数据。在接受进入队列之前验证数据结构非常有用。

Google Pub/Sub 中有类似的东西吗?

message-queue avro apache-kafka google-cloud-pubsub

3
推荐指数
1
解决办法
2621
查看次数

将 Amazon SQS 用于接收相同消息的多个消费者

我有一个主要应用程序向 SQS 队列发送消息,并希望 4 个消费者应用程序使用相同的消息并按照他们想要的方式处理它

我不确定为此目的使用什么队列架构。

我看到了标准SQS、SQS FIFO、(SQS + SNSTopic) 和 Kenesis 的选项

对于我想要的功能,似乎(SQS + SNS Topic) 或 Kenesis 都是可行的方法。

但我也有一个关于标准 SQS 和 SQS FIFO 的问题 - 如果我使用 SQS FIFO 或标准 SQS,是否所有消费者都可以获得相同的消息?

我想我对所有选项感到困惑,并且对队列上的所有可用信息感到不知所措,但仍然对选择哪种架构感到困惑

主要信息来源是亚马逊文档和https://www.schibsted.pl/blog/choosing-best-aws-messaging-service/

我在 stackoverflow 上遇到的一些问题:

Link_1这篇文章回答了在队列中使用多个消费者的问题,但不确定它是否解决了多个消费者消费的相同消息的问题

Link_2 这一个回答了为什么 Kenesis 可以用于我的场景

Helpful_Info我使用这篇文章只是为了了解差异

我真的很感激这方面的一些帮助。我正在尝试尽可能多地阅读,但如果有人能帮助我做出正确的决定,我将不胜感激

message-queue amazon-sqs amazon-web-services microservices

3
推荐指数
2
解决办法
3312
查看次数

AWS SQS:可见性超时到期后,FIFO 队列中的消息顺序是什么?

在 AWS SQS FIFO 的队列中;当已读消息的可见性超时时,消息将位于队列的哪个位置?

例如:

  • 我在队列中有这些消息:'[A, B, C, D]'(顺序:A first in)
  • 我从队列中读取了一条消息,因此收到消息“A”
  • 消息“A”的可见性超时到期,消费者可以再次使用它

消息的新顺序是什么?

  • a) [A、B、C、D]
  • b) [B、C、D、A]

timeout message-queue fifo amazon-sqs amazon-web-services

3
推荐指数
1
解决办法
1058
查看次数