标签: kombu

芹菜与亚马逊SQS

我想使用Amazon SQS作为Celery支持的经纪人.有关于Kombu的SQS传输实现,Celery依赖它.但是没有足够的文档来使用它,所以我找不到如何在Celery上配置SQS.是否有人成功在Celery上配置SQS?

amazon-sqs amazon-web-services celery kombu

51
推荐指数
2
解决办法
2万
查看次数

用于芹菜单元测试的内存代理

我有一个用Django编写的REST API,其中和端点在发布芹菜任务时对其进行排队.响应包含任务ID,我想用它来测试任务是否已创建并获得结果.所以,我想做的事情如下:

def test_async_job():
    response = self.client.post("/api/jobs/", some_test_data, format="json")
    task_id = response.data['task_id']
    result = my_task.AsyncResult(task_id).get()
    self.assertEquals(result, ...)
Run Code Online (Sandbox Code Playgroud)

我显然不想经营一个芹菜工人来进行单元测试,我希望以某种方式嘲笑它.我不能使用CELERY_ALWAYS_EAGER因为这似乎完全绕过经纪人,防止我使用AsyncResult通过其ID来获得任务(如说在这里).

通过芹菜和kombu文档,我发现有一个内存传输单元测试,这将做我正在寻找的.我尝试覆盖BROKER_URL设置以在测试中使用它:

@override_settings(BROKER_URL='memory://')
def test_async_job():
Run Code Online (Sandbox Code Playgroud)

但行为与ampq代理相同:它会阻止测试等待结果.任何想法我怎么配置这个经纪人让它在测试中工作?

django celery kombu

28
推荐指数
3
解决办法
7838
查看次数

什么是django-celery(djcelery)表?

当我运行syncdb时,我注意到很多表创建如下:

  • djcelery_crontabschedule
  • ...
  • djcelery_taskstate

django-kombu提供传输,因此它与实际队列无关.即使我运行任务,我仍然看不到这些表中没有填充任何内容.这些表用于什么?仅用于监控 - 如果我启用它?

如果是这样,如果我查找AsyncResult(),我猜这实际上是通过django-kombu表而不是djcelery查找任务结果吗?

谢谢.

django celery django-celery kombu

14
推荐指数
1
解决办法
4100
查看次数

任何人都可以告诉我python中的pika和kombu消息库之间有什么区别?

我想在我的应用程序中使用消息库来与rabbitmq进行交互.谁能解释一下pika和kombu图书馆之间的区别?

amqp rabbitmq python-2.7 kombu pika

13
推荐指数
1
解决办法
3036
查看次数

Rabbitmq错误:[Errno 10054]远程主机强行关闭现有连接

我在Python中使用Kombu来使用持久的RabbitMQ队列.

在Windows中只有一个消费者使用队列.此消费者产生以下错误:

Traceback (most recent call last):
  File ".\consumer_windows.py", line 66, in <module>
    message.ack()
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\kombu\message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "C:\Users\Administrator\Anaconda2\lib\socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 10054] An existing connection was forcibly closed by the remote …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq kombu

10
推荐指数
1
解决办法
1778
查看次数

即使允许芹菜也不接受泡菜

我正在尝试编写一个 celery 应用程序,将 numpy 数组(或任何任意对象)传递给工作人员。据我所知,这需要通过以下方式进行序列化pickle(注意:我知道安全隐患,但在这种情况下这不是问题)。

但是,即使在尝试了我能找到的所有可能的方法来允许 pickle 作为序列化程序之后,我仍然收到以下 kombu 异常:

kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted
content of type pickle (application/x-python-serialize)
Run Code Online (Sandbox Code Playgroud)

我当前的文件目前是:

# tasks.py
from celery import Celery
app = Celery(
    'tasks',
    broker='redis://localhost',
    accept_content=['pickle'],
    task_serializer='pickle'
)

@app.task
def adding(x, y):
    return x + y

if __name__ == '__main__':
    import numpy as np
    adding.apply_async((np.array([1]), np.array([1])), serializer='pickle')
Run Code Online (Sandbox Code Playgroud)

另外我有一个配置文件:

# celeryconfig.py
print('configuring...')

accept_content = ['pickle', 'application/x-python-serialize']
task_serializer = 'pickle'
result_serializer = 'pickle'
from kombu import serialization
serialization.register_pickle()
serialization.enable_insecure_serializers()
Run Code Online (Sandbox Code Playgroud)

但是,如果我运行 worker ( celery …

python pickle celery kombu

9
推荐指数
1
解决办法
2231
查看次数

Django和Celery - 路由问题

我正在使用Django和Celery,我正在尝试设置路由到多个队列.当我指定任务routing_keyexchange(在任务装饰器中或使用apply_async())时,任务不会添加到代理(这是Kombu连接到我的MySQL数据库).

如果我在任务装饰器中指定队列名称(这将意味着忽略路由键),则任务正常.它似乎是路由/交换设置的问题.

知道问题可能是什么?

这是设置:

settings.py

INSTALLED_APPS = (
    ...
    'kombu.transport.django',
    'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
    'default': {
        'binding_key':'task.#',
    },
    'i_tasks': {
        'binding_key':'important_task.#',
    },
}
Run Code Online (Sandbox Code Playgroud)

tasks.py

from celery.task import task

@task(routing_key='important_task.update')
def my_important_task():
    try:
        ...
    except Exception as exc:
        my_important_task.retry(exc=exc)
Run Code Online (Sandbox Code Playgroud)

启动任务:

from tasks import my_important_task
my_important_task.delay()
Run Code Online (Sandbox Code Playgroud)

django celery django-celery kombu

8
推荐指数
1
解决办法
1万
查看次数

芹菜升级(3.1-> 4.1) - 通过对等方重置连接

我们去年与芹菜合作,约有15名工人,每人都定义了1-4之间的并发性.

最近我们将芹菜从v3.1升级到v4.1

现在我们在每个worker日志中都有以下错误,任何想法会导致这样的错误?

2017-08-21 18:33:19,780 94794  ERROR   Control command error: error(104, 'Connection reset by peer') [file: pidbox.py, line: 46]
Traceback (most recent call last):
  File "/srv/dy/venv/lib/python2.7/site-packages/celery/worker/pidbox.py", line 42, in on_message
    self.node.handle_message(body, message)
  File "/srv/dy/venv/lib/python2.7/site-packages/kombu/pidbox.py", line 129, in handle_message
    return self.dispatch(**body)
  File "/srv/dy/venv/lib/python2.7/site-packages/kombu/pidbox.py", line 112, in dispatch
    ticket=ticket)
  File "/srv/dy/venv/lib/python2.7/site-packages/kombu/pidbox.py", line 135, in reply
    serializer=self.mailbox.serializer)
  File "/srv/dy/venv/lib/python2.7/site-packages/kombu/pidbox.py", line 265, in _publish_reply
    **opts
  File "/srv/dy/venv/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/srv/dy/venv/lib/python2.7/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/srv/dy/venv/lib/python2.7/site-packages/amqp/channel.py", line …
Run Code Online (Sandbox Code Playgroud)

python amqp rabbitmq celery kombu

8
推荐指数
1
解决办法
1917
查看次数

使用具有多个线程的rabbitmq消息队列(Python Kombu)

我有一个单一队列的RabbitMQ交换.我希望创建一个运行多个线程的守护进程,并尽快通过此队列.

"工作"涉及与外部服务进行通信,因此每个消费者中都会有相当多的阻塞.因此,我希望有多个线程处理来自同一队列的消息.

我可以通过在我的主线程上使用队列来实现这一点,然后将传入的工作转移到其他线程池中,但是有没有办法在每个线程的上下文中启动多个使用者?

python multithreading rabbitmq kombu

7
推荐指数
1
解决办法
3843
查看次数

RabbitMQ心跳与连接排除事件超时


我有一个rabbitmq服务器和一个使用kombu的am​​qp使用者(python).
我已将我的应用程序安装在具有防火墙的系统中,该防火墙在1小时后关闭空闲连接.
这是我的amqp_consumer.py:

try:
    # connections
    with Connection(self.broker_url, ssl=_ssl, heartbeat=self.heartbeat) as conn:
        chan = conn.channel()
        # more stuff here
        with conn.Consumer(queue, callbacks = [messageHandler], channel = chan):
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

except Exception as e:
    # do stuff
Run Code Online (Sandbox Code Playgroud)

我想要的是,如果防火墙关闭了连接,那么我想重新连接.我应该使用heartbeat参数还是应该将超时参数(3600秒)传递给drain_events()函数?
两种选择有什么区别?(似乎做同样的事).
谢谢.

python timeout heartbeat rabbitmq kombu

7
推荐指数
1
解决办法
981
查看次数