我们目前开始将事件从一个中央应用程序广播到其他可能感兴趣的消费者应用程序,并且我们团队的成员对于应该在已发布的消息中放入多少内容有不同的选择。
总体思路/架构如下:
基于企业集成模式,我们正在尝试为我们发布的消息定义规范格式,并且在两种方法之间犹豫:
极简消息/ event-store-ish:对于域模型发布的每个事件,生成一条消息,其中仅包含相关的聚合根部分(例如,当更新完成时,仅发布有关更新部分的信息聚合根的,或多或少匹配最终用户在使用我们的应用程序时经历的过程)
优点
缺点
完全包含的幂等消息:对于域模型发布的每个事件,生成一条消息,其中包含该时间点聚合根的完整快照,因此实际上只处理两种消息“创建或更新”和“删除”(+元数据,如有必要,包含更具体的信息)
优点
缺点
您会推荐一种方法而不是另一种方法吗?
我们还应该考虑另一种方法吗?
domain-driven-design message-queue integration-patterns rabbitmq
我想在存储在不同 Docker 容器中的 2 个应用程序之间进行通信,这两个应用程序都属于同一 Docker 网络。我将为此使用消息队列(RabbitMQ)
我是否应该创建第 3 个 Docker 容器作为我的 RabbitMQ 服务器运行,然后为这 2 个特定容器创建一个通道?这样,如果我需要例如需要与其他 2 个通信的第三个应用程序,我可以创建更多频道?
问候!
我不太了解 python rq,我刚刚开始学习它。
有一个task_a需要3分钟才能完成处理。
@job
def task_a():
time.sleep(180)
print('done processing task_a')
def call_3_times():
task_a.delay()
task_a.delay()
task_a.delay()
Run Code Online (Sandbox Code Playgroud)
据我观察,task_a将从队列中一一执行。第一个呼叫结束后,再进行下一个呼叫,依此类推。总时间为 3 分钟 x 3 = 9 分钟
如何使每个task_aincall_3_times函数并行执行?所以所花费的时间少于 9 分钟,可能是 3 分 10 秒(只是一个例子,它可能会比这更快)。
也许我需要生成 3 个 rq 工作人员,是的,它确实工作得更快并且像并行一样。但如果我需要调用它 2000 次怎么办?我应该生成 2000 个 rq 工人吗?我的意思是,必须有更好的方法来做到这一点。
我在全新安装的 RabbitMQ 上设置了一个非常基本的队列“test_queue”,并创建了一个基本的非管理员用户“user”(我已为其授予与管理员帐户相同的虚拟主机访问权限)。
当我通过以下方式在命令行上发送测试消息时:
rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="hello, world" -u admin -p {admin password}
Run Code Online (Sandbox Code Playgroud)
它工作得很好。但是当我尝试使用基本用户时:
rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="hello, world" -u user -p {user password}
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
*** 访问被拒绝:/api/exchanges/%2F/amq.default/publish
我搜索了如何为特定用户添加发布到消息队列的权限,但无法通过噪音找到解决方案。
我有以下要求 -
现在,第二个线程必须始终处于活动状态 - 为此我使用了无限 while 循环,如下所示:
private AutoResetEvent messageReset;
private Queue<byte[]> messageQueue;
//thread 2 method
private void ProcessIncomingMessages()
{
messageReset.WaitOne(); //wait for signal
while(true)
{
if (messageQueue.Count > 0)
{
//processing messages
}
}
}
public void SubmitMessageForProcessing(byte[] message){
messageQueue.Enqueue(message); //enqueue message
// Release the thread
messageReset.Set();
}
Run Code Online (Sandbox Code Playgroud)
现在,这个无限 while 循环使 CPU 利用率非常高。有没有什么办法可以降低CPU利用率
注意:我无法添加任何 thread.sleep 语句,因为传入消息将以最小的延迟显示在 UI 上。
我是 Bull 的新手。我尝试根据他们的文档代码运行 bull。流程正在启动,但我的工作尚未完成,或者不确定它是否触发完成事件?我不确定我在哪里犯了错误
下面附上我的代码
const Queue = require('bull');
const myFirstQueue = new Queue('my-first-queue',
{
redis: {
port: Config.redis.port,
host: Config.redis.host,
password: Config.redis.password
},
});
(async function ad() {
const job = await myFirstQueue.add({
foo: 'bar',
});
})();
myFirstQueue.process(async (job, data) => {
log.debug({ job, data }, 'Job data');
let progress = 0;
for (let i = 0; i < 10; i++) {
await doSomething(data);
progress += 10;
job.progress(progress).catch(err => {
log.debug({ err }, 'Job progress err');
});
log.debug({ progress …Run Code Online (Sandbox Code Playgroud) 我知道消息系统是非阻塞且可扩展的,应该在微服务环境中使用。
我质疑的用例是:
想象一下,有一个管理仪表板客户端负责发送 API 请求来创建 Item 对象。有一个微服务提供 API 端点,该端点使用应存储项目的 MySQL 数据库。还有另一个微服务使用弹性搜索来进行文本搜索。
该管理仪表板客户端是否应该:
A.发送2个API调用;1 调用MySQL服务和另一个elasticsearch服务
或者
B. 发送消息到主题以供MySQL 服务和elasticsearch 服务使用?
考虑 A 或 B 时,各有哪些优缺点?
我认为当只有 2 个微服务正在使用这个主题时,这有点矫枉过正。此外,管理员创建 Item 对象的频率非常小。
面向消息的中间件我指的是诸如高级消息队列协议之类的技术.
显然AMQP是一个与MPI不同的野兽,但我认为使用AMQP可以轻松实现以主从方式运行的分布式内存计算,让AMQP处理公平的工作分配给奴隶,因为它们完成了部分而不是管理队列在主人中明确地工作.
AMQP(如果你有成千上万一起工作的机器)的附加好处是,一台机器的死也不会失速在计算进度MPI_BcastS,因为AMQP可以简单地使用扇出代替MPI_Bcast,这将是不阻止整体计算的进度.
是否有任何AMQP用于分布式计算中的任务协调的示例?
更新: Gearman为容错分布式计算提供了一种非常好的方法.
在编写代码时,我会问自己应该使用call哪种类型的消息,应该使用哪种类型的消息info?
在这个问题下面,还有另一个长期怀疑info, cast, call消息之间是否存在优先级差异?这3种消息是否共享同一队列?
我是学生和C的初学者.我想在C linux中使用消息队列实现双向通信.我需要两个队列或只需要一个才能完成这项任务吗?
另外我想知道我可以将数据(以代码显示)发送到另一个进程,或者我需要将其声明为字符数组.
typedef struct msg1
{
int mlen;
char *data;
}M1;
typedef struct msgbuf
{
long mtype;
M1 *m;
} message_buf;
Run Code Online (Sandbox Code Playgroud)
提前致谢 :)