我想使用Amazon SQS作为Celery支持的经纪人.有关于Kombu的SQS传输实现,Celery依赖它.但是没有足够的文档来使用它,所以我找不到如何在Celery上配置SQS.是否有人成功在Celery上配置SQS?
我有一个用Django编写的REST API,其中和端点在发布芹菜任务时对其进行排队.响应包含任务ID,我想用它来测试任务是否已创建并获得结果.所以,我想做的事情如下:
def test_async_job():
response = self.client.post("/api/jobs/", some_test_data, format="json")
task_id = response.data['task_id']
result = my_task.AsyncResult(task_id).get()
self.assertEquals(result, ...)
Run Code Online (Sandbox Code Playgroud)
我显然不想经营一个芹菜工人来进行单元测试,我希望以某种方式嘲笑它.我不能使用CELERY_ALWAYS_EAGER因为这似乎完全绕过经纪人,防止我使用AsyncResult通过其ID来获得任务(如说在这里).
通过芹菜和kombu文档,我发现有一个内存传输单元测试,这将做我正在寻找的.我尝试覆盖BROKER_URL
设置以在测试中使用它:
@override_settings(BROKER_URL='memory://')
def test_async_job():
Run Code Online (Sandbox Code Playgroud)
但行为与ampq代理相同:它会阻止测试等待结果.任何想法我怎么配置这个经纪人让它在测试中工作?
当我运行syncdb时,我注意到很多表创建如下:
django-kombu提供传输,因此它与实际队列无关.即使我运行任务,我仍然看不到这些表中没有填充任何内容.这些表用于什么?仅用于监控 - 如果我启用它?
如果是这样,如果我查找AsyncResult(),我猜这实际上是通过django-kombu表而不是djcelery查找任务结果吗?
谢谢.
我想在我的应用程序中使用消息库来与rabbitmq进行交互.谁能解释一下pika和kombu图书馆之间的区别?
我在Python中使用Kombu来使用持久的RabbitMQ队列.
在Windows中只有一个消费者使用队列.此消费者产生以下错误:
Traceback (most recent call last):
File ".\consumer_windows.py", line 66, in <module>
message.ack()
File "C:\Users\Administrator\Anaconda2\lib\site-packages\kombu\message.py", line 88, in ack
self.channel.basic_ack(self.delivery_tag)
File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\channel.py", line 1584, in basic_ack
self._send_method((60, 80), args)
File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\method_framing.py", line 221, in write_method
write_frame(1, channel, payload)
File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\transport.py", line 182, in write_frame
frame_type, channel, size, payload, 0xce,
File "C:\Users\Administrator\Anaconda2\lib\socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 10054] An existing connection was forcibly closed by the remote …
Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个 celery 应用程序,将 numpy 数组(或任何任意对象)传递给工作人员。据我所知,这需要通过以下方式进行序列化pickle
(注意:我知道安全隐患,但在这种情况下这不是问题)。
但是,即使在尝试了我能找到的所有可能的方法来允许 pickle 作为序列化程序之后,我仍然收到以下 kombu 异常:
kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted
content of type pickle (application/x-python-serialize)
Run Code Online (Sandbox Code Playgroud)
我当前的文件目前是:
# tasks.py
from celery import Celery
app = Celery(
'tasks',
broker='redis://localhost',
accept_content=['pickle'],
task_serializer='pickle'
)
@app.task
def adding(x, y):
return x + y
if __name__ == '__main__':
import numpy as np
adding.apply_async((np.array([1]), np.array([1])), serializer='pickle')
Run Code Online (Sandbox Code Playgroud)
另外我有一个配置文件:
# celeryconfig.py
print('configuring...')
accept_content = ['pickle', 'application/x-python-serialize']
task_serializer = 'pickle'
result_serializer = 'pickle'
from kombu import serialization
serialization.register_pickle()
serialization.enable_insecure_serializers()
Run Code Online (Sandbox Code Playgroud)
但是,如果我运行 worker ( celery …
我正在使用Django和Celery,我正在尝试设置路由到多个队列.当我指定任务routing_key
和exchange
(在任务装饰器中或使用apply_async()
)时,任务不会添加到代理(这是Kombu连接到我的MySQL数据库).
如果我在任务装饰器中指定队列名称(这将意味着忽略路由键),则任务正常.它似乎是路由/交换设置的问题.
知道问题可能是什么?
这是设置:
settings.py
INSTALLED_APPS = (
...
'kombu.transport.django',
'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
'default': {
'binding_key':'task.#',
},
'i_tasks': {
'binding_key':'important_task.#',
},
}
Run Code Online (Sandbox Code Playgroud)
tasks.py
from celery.task import task
@task(routing_key='important_task.update')
def my_important_task():
try:
...
except Exception as exc:
my_important_task.retry(exc=exc)
Run Code Online (Sandbox Code Playgroud)
启动任务:
from tasks import my_important_task
my_important_task.delay()
Run Code Online (Sandbox Code Playgroud) 我们去年与芹菜合作,约有15名工人,每人都定义了1-4之间的并发性.
最近我们将芹菜从v3.1升级到v4.1
现在我们在每个worker日志中都有以下错误,任何想法会导致这样的错误?
2017-08-21 18:33:19,780 94794 ERROR Control command error: error(104, 'Connection reset by peer') [file: pidbox.py, line: 46]
Traceback (most recent call last):
File "/srv/dy/venv/lib/python2.7/site-packages/celery/worker/pidbox.py", line 42, in on_message
self.node.handle_message(body, message)
File "/srv/dy/venv/lib/python2.7/site-packages/kombu/pidbox.py", line 129, in handle_message
return self.dispatch(**body)
File "/srv/dy/venv/lib/python2.7/site-packages/kombu/pidbox.py", line 112, in dispatch
ticket=ticket)
File "/srv/dy/venv/lib/python2.7/site-packages/kombu/pidbox.py", line 135, in reply
serializer=self.mailbox.serializer)
File "/srv/dy/venv/lib/python2.7/site-packages/kombu/pidbox.py", line 265, in _publish_reply
**opts
File "/srv/dy/venv/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/srv/dy/venv/lib/python2.7/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/srv/dy/venv/lib/python2.7/site-packages/amqp/channel.py", line …
Run Code Online (Sandbox Code Playgroud) 我有一个单一队列的RabbitMQ交换.我希望创建一个运行多个线程的守护进程,并尽快通过此队列.
"工作"涉及与外部服务进行通信,因此每个消费者中都会有相当多的阻塞.因此,我希望有多个线程处理来自同一队列的消息.
我可以通过在我的主线程上使用队列来实现这一点,然后将传入的工作转移到其他线程池中,但是有没有办法在每个线程的上下文中启动多个使用者?
我有一个rabbitmq服务器和一个使用kombu的amqp使用者(python).
我已将我的应用程序安装在具有防火墙的系统中,该防火墙在1小时后关闭空闲连接.
这是我的amqp_consumer.py:
try:
# connections
with Connection(self.broker_url, ssl=_ssl, heartbeat=self.heartbeat) as conn:
chan = conn.channel()
# more stuff here
with conn.Consumer(queue, callbacks = [messageHandler], channel = chan):
# Process messages and handle events on all channels
while True:
conn.drain_events()
except Exception as e:
# do stuff
Run Code Online (Sandbox Code Playgroud)
我想要的是,如果防火墙关闭了连接,那么我想重新连接.我应该使用heartbeat参数还是应该将超时参数(3600秒)传递给drain_events()
函数?
两种选择有什么区别?(似乎做同样的事).
谢谢.