标签: message-queue

消息类型:消息应包含多少信息?

我们目前开始将事件从一个中央应用程序广播到其他可能感兴趣的消费者应用程序,并且我们团队的成员对于应该在已发布的消息中放入多少内容有不同的选择。

总体思路/架构如下:

  • 生产者应用程序中:
    • 用户与一些可以创建/修改/删除的实体(DDD 意义上的聚合根)进行交互
    • 根据正在发生的情况,引发域事件(例如:EntityXCreated、EntityYDeleted、EntityZTransferred 等...即不仅是 CRUD,而且主要是)
    • 引发的事件被翻译/转换为我们发送到 RabbitMQ Exchange 的消息
  • RabbitMQ中 (我们正在使用 RabbitMQ,但我相信问题实际上与技术无关)
    • 我们为每个消费应用程序定义一个队列
    • 绑定将交换连接到消费者队列(可能带有消息过滤)
  • 消费应用程序中
    • 应用程序使用并处理队列中的消息

基于企业集成模式,我们正在尝试为我们发布的消息定义规范格式,并且在两种方法之间犹豫:

  1. 极简消息/ event-store-ish:对于域模型发布的每个事件,生成一条消息,其中仅包含相关的聚合根部分(例如,当更新完成时,仅发布有关更新部分的信息聚合根的,或多或少匹配最终用户在使用我们的应用程序时经历的过程)

    • 优点

      • 消息尺寸小
      • 非常专业的消息类型
      • 靠近“领域事件”
    • 缺点

      • 如果无法保证交付顺序,则会出现问题(即,如果在创建消息之前收到更新消息怎么办?)
      • 消费者需要知道要订阅哪些消息类型(可能需要一个大列表/领域知识)
      • 如果消费者状态和生产者状态不同步怎么办?
      • 如何处理未来注册但不了解所有过去事件的新消费者
  2. 完全包含的幂等消息:对于域模型发布的每个事件,生成一条消息,其中包含该时间点聚合根的完整快照,因此实际上只处理两种消息“创建或更新”和“删除”(+元数据,如有必要,包含更具体的信息)

    • 优点

      • 幂等(声明性消息指出“这就是事实,尽你所能同步自己”)
      • 需要维护/处理的消息格式数量更少
      • 允许逐步纠正消费者的同步错误
      • 只要生成的消息遵循规范数据模型,消费者就会自动处理新的领域事件
    • 缺点

      • 更大的消息负载
      • 不太纯净

您会推荐一种方法而不是另一种方法吗?

我们还应该考虑另一种方法吗?

domain-driven-design message-queue integration-patterns rabbitmq

4
推荐指数
1
解决办法
1015
查看次数

使用 RabbitMQ 进行不同 Docker 容器之间的通信

我想在存储在不同 Docker 容器中的 2 个应用程序之间进行通信,这两个应用程序都属于同一 Docker 网络。我将为此使用消息队列(RabbitMQ)

我是否应该创建第 3 个 Docker 容器作为我的 RabbitMQ 服务器运行,然后为这 2 个特定容器创建一个通道?这样,如果我需要例如需要与其他 2 个通信的第三个应用程序,我可以创建更多频道?

问候!

message-queue rabbitmq docker microservices

4
推荐指数
1
解决办法
1345
查看次数

python rq worker并行执行任务

我不太了解 python rq,我刚刚开始学习它。

有一个task_a需要3分钟才能完成处理。

@job
def task_a():
    time.sleep(180)
    print('done processing task_a')

def call_3_times():
    task_a.delay()
    task_a.delay()
    task_a.delay()
Run Code Online (Sandbox Code Playgroud)

据我观察,task_a将从队列中一一执行。第一个呼叫结束后,再进行下一个呼叫,依此类推。总时间为 3 分钟 x 3 = 9 分钟

如何使每个task_aincall_3_times函数并行执行?所以所花费的时间少于 9 分钟,可能是 3 分 10 秒(只是一个例子,它可能会比这更快)。

也许我需要生成 3 个 rq 工作人员,是的,它确实工作得更快并且像并行一样。但如果我需要调用它 2000 次怎么办?我应该生成 2000 个 rq 工人吗?我的意思是,必须有更好的方法来做到这一点。

python message-queue python-rq

4
推荐指数
1
解决办法
9178
查看次数

发布消息时 RabbitMQ 访问被拒绝 - 如何授予用户特定队列的权限?

我在全新安装的 RabbitMQ 上设置了一个非常基本的队列“test_queue”,并创建了一个基本的非管理员用户“user”(我已为其授予与管理员帐户相同的虚拟主机访问权限)。

当我通过以下方式在命令行上发送测试消息时:

rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="hello, world" -u admin -p {admin password}
Run Code Online (Sandbox Code Playgroud)

它工作得很好。但是当我尝试使用基本用户时:

rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="hello, world" -u user -p {user password}
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

*** 访问被拒绝:/api/exchanges/%2F/amq.default/publish

我搜索了如何为特定用户添加发布到消息队列的权限,但无法通过噪音找到解决方案。

message-queue rabbitmq

4
推荐指数
1
解决办法
7419
查看次数

C# 线程处理消息队列

我有以下要求 -

  1. 接收消息并将其排入队列的线程。
  2. 处理排队消息的线程。

现在,第二个线程必须始终处于活动状态 - 为此我使用了无限 while 循环,如下所示:

private AutoResetEvent messageReset;
private Queue<byte[]> messageQueue;

//thread 2 method
private void ProcessIncomingMessages()
{
 messageReset.WaitOne(); //wait for signal
 while(true)
 {
    if (messageQueue.Count > 0)
    {
        //processing messages
    }
  }
}
public void SubmitMessageForProcessing(byte[] message){
            messageQueue.Enqueue(message); //enqueue message

            // Release the thread
            messageReset.Set();
}
Run Code Online (Sandbox Code Playgroud)

现在,这个无限 while 循环使 CPU 利用率非常高。有没有什么办法可以降低CPU利用率

注意:我无法添加任何 thread.sleep 语句,因为传入消息将以最小的延迟显示在 UI 上。

c# multithreading message-queue

4
推荐指数
1
解决办法
6979
查看次数

Bull 队列尚未完成

我是 Bull 的新手。我尝试根据他们的文档代码运行 bull。流程正在启动,但我的工作尚未完成,或者不确定它是否触发完成事件?我不确定我在哪里犯了错误

下面附上我的代码

const Queue = require('bull');

const myFirstQueue = new Queue('my-first-queue', 
{
  redis: { 
      port: Config.redis.port, 
      host: Config.redis.host, 
      password: Config.redis.password 
  },
});



(async function ad() {
    const job = await myFirstQueue.add({
    foo: 'bar',
  });
})();

myFirstQueue.process(async (job, data) => {
  log.debug({ job, data }, 'Job data');
  let progress = 0;
  for (let i = 0; i < 10; i++) {
    await doSomething(data);
    progress += 10;
    job.progress(progress).catch(err => {
      log.debug({ err }, 'Job progress err');
    });
    log.debug({ progress …
Run Code Online (Sandbox Code Playgroud)

queue priority-queue message-queue job-queue node.js

4
推荐指数
1
解决办法
2万
查看次数

微服务:API 调用与消息传递。何时使用?

我知道消息系统是非阻塞且可扩展的,应该在微服务环境中使用。

我质疑的用例是:

想象一下,有一个管理仪表板客户端负责发送 API 请求来创建 Item 对象。有一个微服务提供 API 端点,该端点使用应存储项目的 MySQL 数据库。还有另一个微服务使用弹性搜索来进行文本搜索。

该管理仪表板客户端是否应该:

A.发送2个API调用;1 调用MySQL服务和另一个elasticsearch服务

或者

B. 发送消息到主题以供MySQL 服务和elasticsearch 服务使用?

考虑 A 或 B 时,各有哪些优缺点?

我认为当只有 2 个微服务正在使用这个主题时,这有点矫枉过正。此外,管理员创建 Item 对象的频率非常小。

rest messaging message-queue apache-kafka microservices

4
推荐指数
1
解决办法
4958
查看次数

可以使用面向消息的中间件代替MPI来协调分布式计算吗?

面向消息的中间件我指的是诸如高级消息队列协议之类的技术.

显然AMQP是一个与MPI不同的野兽,但我认为使用AMQP可以轻松实现以主从方式运行的分布式内存计算,让AMQP处理公平的工作分配给奴隶,因为它们完成了部分而不是管理队列在主人中明确地工作.

AMQP(如果你有成千上万一起工作的机器)的附加好处是,一台机器的死也不会失速在计算进度MPI_BcastS,因为AMQP可以简单地使用扇出代替MPI_Bcast,这将是不阻止整体计算的进度.

是否有任何AMQP用于分布式计算中的任务协调的示例?

更新: Gearman为容错分布式计算提供了一种非常好的方法.

language-agnostic parallel-processing message-queue amqp

3
推荐指数
1
解决办法
1005
查看次数

otp gen_server的info,call,cast message queue中有不同的优先级吗?

在编写代码时,我会问自己应该使用call哪种类型的消息,应该使用哪种类型的消息info

在这个问题下面,还有另一个长期怀疑info, cast, call消息之间是否存在优先级差异?这3种消息是否共享同一队列?

erlang message-queue gen-server

3
推荐指数
1
解决办法
796
查看次数

C中的消息队列:实现2路通信

我是学生和C的初学者.我想在C linux中使用消息队列实现双向通信.我需要两个队列或只需要一个才能完成这项任务吗?

另外我想知道我可以将数据(以代码显示)发送到另一个进程,或者我需要将其声明为字符数组.

typedef struct msg1
{
    int mlen;
    char *data;
}M1;

typedef struct msgbuf
{
    long    mtype;
    M1      *m;
} message_buf;
Run Code Online (Sandbox Code Playgroud)

提前致谢 :)

c linux pointers ipc message-queue

3
推荐指数
1
解决办法
2万
查看次数