标签: message-queue

NATS 发布者可以将一条消息发送到多个队列吗?

我正在构建一个系统,其中两个不同的实体需要处理来自同一源的消息(以不同的方式 - 例如,一个实体将记录所有消息,而另一个实体可能想要聚合数据)。

理想情况下,每个实体在性能和弹性方面都是完全可扩展的,因此我们有多个发布者、多个日志订阅者和多个聚合订阅者,但每个发布者生成的每条消息仍然由一个日志订阅者和一个聚合订阅者处理。

使用 AMQP,我们可以通过发布到扇出交换来实现此目的,该交换将消息分发到两个队列,其中每个队列有许多订阅者。据我了解,只需让所有订阅者根据其角色使用两个不同的“队列组名称”来监听同一“主题”,就可以在 NATS 中实现相同的行为。

在这种情况下,发送给主题的消息将被传递给来自每个队列组的一个订阅者,即每条消息将被精确地传递n次,n是不同队列组的数量而不是订阅者的数量。它是否正确?

messaging message-queue publish-subscribe nats.io

3
推荐指数
1
解决办法
2645
查看次数

访问IPC_PRIVATE生成的key_t

我正在尝试同步并使一些线程/进程为项目进行通信,并且理想情况下我希望访问它们之间的一些共享内存块,而不会让它们与其他进程/资源发生冲突。

我知道 IPC_PRIVATE 在调用shmget()创建它时会生成一个唯一的密钥,但是如果我随后需要该密钥在其他进程中的某个位置打开该区域,我如何访问该生成的key_t值以便将其发送到其他进程?

我目前正在通过 IPC 消息队列发送数据,因此我可以发送 shmid 值,但据我所知,这不起作用,因为 shmid 值对于每个进程都是唯一的。

我没有其他选择,只能尝试ftok()一些随机文件吗?我是否必须为我想要创建的不同共享内存的每个块选择不同的文件?

感谢您的时间。

c ipc message-queue shared-memory

3
推荐指数
1
解决办法
3312
查看次数

当我尝试从两个不同的终端选项卡轮询相同的 Amazon SQS 时,我在两个选项卡中都没有收到相同的消息

我创建了一个 Amazon SNS 主题。我有一个订阅该主题的 Amazon SQS 队列。

我创建了一个默认的 SQS 队列(不是 FIFO 队列)。

我正在使用sqs-consumerAPI 来长轮询队列SQS

const app = Consumer.create({
    queueUrl: 'https://sqs.us-east-2.amazonaws.com/xxxxxxxxxxxx/xxxxxxxxxxx',
    handleMessage: async (message) => {

        console.log(message);
    },
    sqs: sqs//new AWS.SQS({apiVersion: '2012-11-05'})
});

app.on('error', (err) => {
    console.error(err.message);
});

app.on('processing_error', (err) => {
    console.error(err.message);
});

app.on('timeout_error', (err) => {
    console.error(err.message);
});

app.start();
Run Code Online (Sandbox Code Playgroud)

当我js通过执行以下操作从单个终端运行这段文件时node sqs_client.js,一切都工作得很好,消息也按正确的顺序发送。

但是,如果打开另一个终端窗口并运行node sqs_client.js,则传入消息的顺序变得非常随机。较新的消息可能以任何顺序进入第一终端窗口或第二终端窗口。

为什么会这样呢?有什么方法可以防止这种情况,以便我可以同时在两个终端窗口中收到相同的消息。

message-queue amazon-sqs amazon-web-services long-polling amazon-sns

3
推荐指数
1
解决办法
1248
查看次数

轮询 AWS SQS 队列以获取具有特定属性的消息

我已经使用 AWS SQS 设置了一个标准队列,我想轮询它queue以查找包含特定 的消息attribute,最好使用boto3python 中的库。我知道有一个从队列中轮询消息的boto3方法。recieve_message()但是,我只想获取那些包含特定属性的消息。一种幼稚的方法是迭代输出receive_message()并检查 a messagein是否receive_message()包含attribute,但我想知道是否还有其他解决方案来解决这个问题。

message-queue amazon-sqs amazon-web-services

3
推荐指数
1
解决办法
5448
查看次数

Python Flask:如何将请求添加到队列并在执行之前重新排列队列中的任务

一些上下文:

想要编写一个接受客户端门票的算法。根据某些约束对它们进行排序,处理它们,并将结果回复给客户端。

我做了一些研究,虽然 python 的 REST API 是一个好主意。但当我探索它时,我发现,它通常是为了一次处理一个请求而构建的。

有没有办法将任务(REST API 请求)添加到队列中,对它们进行排序并与工作人员一起执行它们,并在处理完成后回复客户端?

python message-queue redis flask flask-restful

3
推荐指数
1
解决办法
1万
查看次数

如何通过一对一的消息处理来实施 Amazon SQS (fifo)-lambda

我有一个用例,其中有一个带有 lambda 函数的 Amazon SQS fifo 队列。我需要确保 fifo 仅在前一个 lambda 执行完成时才触发 lambda(事件也按顺序出现)。从 aws 文档来看,fifo 支持一次处理,但它没有在任何地方提到它不会在 lambda 上推送更多事件,直到第一条消息完全处理为止

我需要确保仅当 lambda 函数完全处理前一条消息时才处理下一条消息。

有没有办法确保当消息 1 完全由 lambda 处理时,消息 2 只由 lambda 处理?

queue message-queue amazon-sqs amazon-web-services aws-lambda

3
推荐指数
1
解决办法
2253
查看次数

android中消息队列内存泄漏?

LeakCanary 检测到我的 MainActivity.java 中存在内存泄漏。这是我的泄漏痕迹。

\n
\xe2\x94\xac\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\n\xe2\x94\x82 GC Root: Input or output parameters in native code\n\xe2\x94\x82\n\xe2\x94\x9c\xe2\x94\x80 android.os.MessageQueue instance\n\xe2\x94\x82    Leaking: NO (MessageQueue#mQuitting is false)\n\xe2\x94\x82    HandlerThread: "main"\n\xe2\x94\x82    \xe2\x86\x93 MessageQueue.mMessages\n\xe2\x94\x82                   ~~~~~~~~~\n\xe2\x94\x9c\xe2\x94\x80 android.os.Message instance\n\xe2\x94\x82    Leaking: UNKNOWN\n\xe2\x94\x82    Retaining 14.2 kB in 348 objects\n\xe2\x94\x82    Message.what = 0\n\xe2\x94\x82    Message.when = 37524601 (681 ms after heap dump)\n\xe2\x94\x82    Message.obj = null\n\xe2\x94\x82    Message.callback = instance @319985112 of com.application.app.\n\xe2\x94\x82    MainActivity$$ExternalSyntheticLambda2\n\xe2\x94\x82    \xe2\x86\x93 Message.callback\n\xe2\x94\x82              ~~~~~~~~\n\xe2\x94\x9c\xe2\x94\x80 com.application.app.MainActivity$$ExternalSyntheticLambda2 instance\n\xe2\x94\x82    Leaking: UNKNOWN\n\xe2\x94\x82    Retaining 12 B in 1 objects\n\xe2\x94\x82    f$0 instance of com.application.app.MainActivity with mDestroyed =\n\xe2\x94\x82    true\n\xe2\x94\x82    \xe2\x86\x93 …
Run Code Online (Sandbox Code Playgroud)

java android memory-leaks message-queue leakcanary

3
推荐指数
1
解决办法
1404
查看次数

从java代码获取rabbitmq队列中的消息计数

我试图检查给定的rabbitmq队列是否为空.为此,我试图使用:

channel.queueDeclarePassive(queueName).getMessageCount().
Run Code Online (Sandbox Code Playgroud)

使用这个我总是得到0作为答案,不管显示的消息数量rabbitmqctl list_queues.据我所搜索,没有可用的API.

我想要给出相同的答案rabbitmqctl list_queues.请建议一种方法来做到这一点.

java message-queue rabbitmq

2
推荐指数
1
解决办法
7156
查看次数

Executor框架和消息队列(如JMS)之间的区别

消息队列主要用于在服务器上执行异步任务,我最近阅读了有关Executor框架的内容,它也执行相同的操作,生成并管理线程以执行异步任务.谁能告诉我两者之间的区别?

java asynchronous message-queue executorservice

2
推荐指数
1
解决办法
744
查看次数

Amazon批处理作业进程队列

我正在使用AWS EC2实例进行生物信息学工作.我有一个(~1000)个大文件,应该使用EC2实例上的脚本进行处理,结果应该上传回S3存储桶.我想将作业(文件)分发给多个EC2实例,优先以现货价格开始.

我需要的是一个简单易用的排队系统(可能是AWS SQS或其他东西),它可以将作业分配到实例并在实例失败时重新启动作业(由于现货价格过高或其他原因).我研究过AWS SQS示例,但这些示例过于先进,通常涉及自动扩展和复杂的消息生成应用程序.

有人能从概念上指出如何以最简单的方式解决这个问题吗?AWS SQS这个简单应用的任何示例?应该如何启动一堆实例以及如何告诉他们收听队列?

对于每个输入文件,我的工作流程基本上都是这样

aws s3 cp s3://mybucket/file localFile ## Possibly streaming the file without copy
work.py --input localFile --output outputFile
aws s3 cp outputFile s3://mybucket/output/outputFile
Run Code Online (Sandbox Code Playgroud)

queue message-queue amazon-ec2 amazon-web-services

2
推荐指数
2
解决办法
3840
查看次数