我正在尝试在 django 视图中异步运行任务。为此,我使用 celery 和rabbitmq。通过遵循小规模上下文指南,我在模块(servicenow.py)中将任务定义为 -
app = Celery('servicenow',broker='amqp://username:password@localhost:15672')
.
.
@app.task
def get_ITARAS_dump(self):
.
.
self.update_state(state='PROGRESS',meta={'current':i,'total':len(taskList)})
Run Code Online (Sandbox Code Playgroud)
我的rabbitmq服务器在brew服务中运行
之后我尝试启动一个工作实例,
celery -A servicenow worker -l info
然后我的错误消息为 -
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/bin/celery", line 11, in <module>
sys.exit(main())
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/__main__.py", line 30, in main
main()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/bin/celery.py", line 81, in main
cmd.execute_from_commandline(argv)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/bin/celery.py", line 793, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/bin/base.py", line 311, in execute_from_commandline
return self.handle_argv(self.prog_name, argv[1:])
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/bin/celery.py", line 785, in handle_argv …
Run Code Online (Sandbox Code Playgroud) 通过 Symfony 组件调度事件Messenger
和通过 Symfony 组件调度事件之间的基本区别是什么EventDispatcher
?
所以我试图了解队列解决了哪些实际问题。通过阅读谷歌上的所有信息,我得到了高水平的信息。
因此,我正在研究 A 公司的架构,他们对于作业队列有不同的用例,例如
为什么要稍后处理呢?
这是我最好的猜测...
假设这是一个有效的用例,那么添加更多服务器来处理更多“事物”难道没有意义吗?是因为添加更多服务器比使用队列成本更高并且稍微牺牲响应时间吗?
根据我的用例示例,队列还能为它们解决哪些其他问题?
我正在构建一个 django web 应用程序,我需要在网页上实时传输一些股票市场交易。为了做到这一点,我正在寻找各种方法,我发现了 Pusher 和 RabbitMQ。
使用 RabbitMQ,我只需将消息发送到 RMQ 并从 Django 使用它们,以便将它们显示在网页上。在寻找其他解决方案时,我还发现了 Pusher。对我来说,不清楚的是两者在技术上的区别。我不明白在哪里使用 Rabbit,在哪里使用 Pusher,有人可以向我解释一下它们有什么不同吗?提前致谢!
我仔细研究了非常类似的 stackoverflow 问题,并将我的代码更改为现在的样子。我仍然收到cannot resolve broker hostname
错误。如果我只是使用amqp:user:mypass@locahost:5672
我会得到connection refused
# docker-compoose.yml
version: "3.8"
services:
broker:
image: rabbitmq:3-management-alpine
hostname: rabbit1
environment: &env
RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
RABBITMQ_DEFAULT_USER: "user"
RABBITMQ_DEFAULT_PASS: "password"
RABBITMQ_DEFAULT_VHOST: "/"
CELERY_BROKER_URL: "amqp://user:password@rabbit1:5672"
ports:
- "5672:5672"
- "15672:15672"
expose:
- "5672"
networks:
- webnet
volumes:
- .:/home/user
worker:
build:
context: .
dockerfile: ./worker/Dockerfile
command: ["celery", "worker", "--app=worker.tasks.app", "--loglevel=INFO"]
environment:
<<: *env
links:
- broker
depends_on:
- broker
networks:
- webnet
volumes:
- .:/home/user
networks:
webnet:
Run Code Online (Sandbox Code Playgroud)
# Dockerfile
FROM python:3.7-slim …
Run Code Online (Sandbox Code Playgroud) RabbitMQ 最近开始崩溃。我怀疑我不小心更新了一些东西。在 MacOS 10.15.7 上,brew list rabbitmq
在/usr/local/homebrew/Cellar/rabbitmq/3.8.14
.
当我尝试运行rabbitmq-server
或任何rabbitmqctl
命令(例如 )时rabbitmqctl version
,它崩溃并显示:
{"init terminating in do_boot",{load_failed,[supervisor,logger_backend,logger_simple_h,logger_config,gen_event,logger_server,kernel,file_io_server,file_server,filename,file,erl_parse,erl_lint,application_controller,error_logger,lists,application,application_master,code,error_handler,logger,logger_filters,heart,gen_server,gen,ets,proc_lib,erl_eval,code_server]}}
Run Code Online (Sandbox Code Playgroud)
卸载并重新安装没有帮助。
该错误与此处描述的错误类似,其中建议这样做:
某些模块无法加载。您运行的 Erlang 版本很可能比 RabbitMQ 支持的最旧版本更旧
但根据文档,RabbitMQ3.8.14
需要 Erlang22.3
来23.x
进行erl --version
打印Erlang/OTP 23
。
关于出了什么问题的任何线索吗?
我正在使用 NestJS 应用程序来使用 RabbitMQ 队列。无论顺序如何,每条消息都可以处理,所以我想知道为同一队列声明新消费者的最佳实践是什么。
预期行为:队列由该服务处理,该服务使用多个消费者
队列:[1,2,3,4,5,6,...N];
在nestJS中,您可以使用@RabbitSubscribe
装饰器来分配一个函数来处理数据。我想要做的可以通过简单地使用装饰器复制(并重命名)该函数来实现,因此该函数也将被调用来处理来自队列的数据
@RabbitSubscribe({
...
queue: 'my-queue',
})
async firstSubscriber(data){
// 1, 3, 5...
}
@RabbitSubscribe({
...
queue: 'my-queue',
})
async secondSubscriber(data){
// 2, 4, 6...
}
Run Code Online (Sandbox Code Playgroud)
我知道我可以复制项目并水平扩展,但我更喜欢在同一流程中执行此操作。
我如何声明订阅者以编程方式获得相同的行为,以便我可以通过更多并发处理来处理数据?
我正在尝试为我的项目开发测试,我有一个连接到rabbitmq并消耗队列的文件,但我在测试它时遇到了问题
const amqp = require('amqplib/callback_api');
const rabbitConsumer = (io) => {
setTimeout(() => {
amqp.connect('amqp://rabbitmq', (error0, connection) => {
if (error0) {
throw error0;
}
connection.createChannel((error1, channel) => {
if (error1) {
throw error1;
}
const queue = 'message';
channel.assertQueue(queue, {
durable: false,
});
console.log(' [*] Waiting for message', queue);
channel.consume(
queue,
(data) => {
console.log(' [x] Received data:', data.content.toString('utf-8'));
io.emit('sendMessage', data.content.toString('utf-8'));
},
{
noAck: true,
}
);
});
});
}, 10000);
};
module.exports = rabbitConsumer;
Run Code Online (Sandbox Code Playgroud)
可以测试这个文件吗?我如何使用 JEST 或任何其他库来做到这一点?
我尝试在更新后启动rabbitmq-server并收到此错误。
2023-07-18 14:47:49.621801+03:00 [error] <0.234.0> Feature flags: `classic_mirrored_queue_version`: required feature flag not enabled! It must be enabled before upgrading RabbitMQ.
2023-07-18 14:47:49.627876+03:00 [error] <0.234.0> Failed to initialize feature flags registry: {disabled_required_feature_flag,
2023-07-18 14:47:49.627876+03:00 [error] <0.234.0> classic_mirrored_queue_version}
BOOT FAILED
===========
Error during startup: {error,failed_to_initialize_feature_flags_registry}
2023-07-18 14:47:49.633989+03:00 [error] <0.234.0>
2023-07-18 14:47:49.633989+03:00 [error] <0.234.0> BOOT FAILED
2023-07-18 14:47:49.633989+03:00 [error] <0.234.0> ===========
2023-07-18 14:47:49.633989+03:00 [error] <0.234.0> Error during startup: {error,failed_to_initialize_feature_flags_registry}
2023-07-18 14:47:49.633989+03:00 [error] <0.234.0>
2023-07-18 14:47:50.635088+03:00 [error] <0.233.0> crasher:
2023-07-18 14:47:50.635088+03:00 [error] <0.233.0> initial …
Run Code Online (Sandbox Code Playgroud) 我使用Spring boot和RabbitMQ为多个生产者和消费者开发了一个应用程序.应用程序工作正常没有任何问题,但我仍然想进行单元测试和集成测试.我浏览谷歌但没有运气没有得到坚实的用例来测试Spring boot和rabbitMQ在一起.
所以我想知道哪种工具最适合测试Spring Boot和RabbitMQ(至少有一个如何编写测试用例的提示是可观的)?我看到类似的stackoverflow帖子,但没有得到解决方案.你的帮助应该感激.
rabbitmq ×10
celery ×2
django ×2
node.js ×2
amazon-sqs ×1
amazon-swf ×1
amqp ×1
docker ×1
nestjs ×1
pusher ×1
python ×1
python-3.x ×1
redis ×1
spring-boot ×1
symfony ×1
tdd ×1
ts-jest ×1
unit-testing ×1