标签: celery

在Celery任务中运行Scrapy蜘蛛

我有一个Django站点,当用户请求它时会发生刮擦,我的代码在新进程中启动Scrapy蜘蛛独立脚本.当然,这不会增加用户.

像这样的东西:

class StandAloneSpider(Spider):
    #a regular spider

settings.overrides['LOG_ENABLED'] = True
#more settings can be changed...

crawler = CrawlerProcess( settings )
crawler.install()
crawler.configure()

spider = StandAloneSpider()

crawler.crawl( spider )
crawler.start()
Run Code Online (Sandbox Code Playgroud)

我决定使用Celery并使用worker来排队爬网请求.

但是,我遇到了Tornado反应堆无法重启的问题.第一个和第二个蜘蛛成功运行,但后续的蜘蛛会抛出ReactorNotRestartable错误.

任何人都可以在Celery框架内分享运行蜘蛛的任何提示?

python django scrapy celery

33
推荐指数
2
解决办法
1万
查看次数

Python + Celery:链接工作?

芹菜文件表明,这是一个坏主意,有任务等待的其他任务的结果.但建议的解决方案(见"好"标题)离开是可喜爱的东西.具体来说,没有明确的方法将子任务的结果返回给调用者(同样,它有点难看).

那么,有没有"链接"工作的方式,所以调用者得到最终工作的结果?例如,要使用add示例:

>>> add3 = add.subtask(args=(3, ))
>>> add.delay(1, 2, callback=add3).get()
6
Run Code Online (Sandbox Code Playgroud)

或者,返回Result的实例是否可以?例如:

@task
def add(x, y, callback=None):
    result = x + y
    if callback:
        return subtask(callback).delay(result)
    return result
Run Code Online (Sandbox Code Playgroud)

这将使链中"最终"工作的结果可以通过简单的方式重新获得:

result = add(1, 2, callback=add3).delay()
while isinstance(result, Result):
    result = result.get()
print "result:", result
Run Code Online (Sandbox Code Playgroud)

python celery

32
推荐指数
1
解决办法
2万
查看次数

如何将周期性任务发送到Celery中的特定队列

默认情况下,Celery将所有任务发送到"celery"队列,但您可以通过添加额外参数来更改此行为:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example
Run Code Online (Sandbox Code Playgroud)

调度程序设置:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    },
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
Run Code Online (Sandbox Code Playgroud)

跑步工人:

python manage.py celery worker -c 1 -Q celery_periodic -B -E
Run Code Online (Sandbox Code Playgroud)

此方案无法按预期工作:此工作人员将定期任务发送到"芹菜"队列,而不是"celery_periodic".我该如何解决这个问题?

PS芹菜== 3.0.16

python celery django-celery

32
推荐指数
2
解决办法
1万
查看次数

Django Celery - 无法连接到amqp://guest@127.0.0.8000:5672 //

我正在尝试建立Django-Celery.我正在阅读教程

http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

当我运行$ python manage.py celery worker --lvelvel = info时

我明白了

[Tasks]


/Users/msmith/Documents/dj/venv/lib/python2.7/site-packages/djcelery/loaders.py:133:     UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in     production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '

[2013-08-08 11:15:25,368: WARNING/MainProcess] /Users/msmith/Documents/dj/venv/lib/python2.7/site-packages/djcelery/loaders.py:133: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '

[2013-08-08 11:15:25,369: WARNING/MainProcess] celery@sfo-mpmgr ready.
[2013-08-08 11:15:25,382: ERROR/MainProcess] consumer: Cannot connect to     amqp://guest@127.0.0.1:5672/celeryvhost: [Errno 61] …
Run Code Online (Sandbox Code Playgroud)

python django celery

32
推荐指数
3
解决办法
4万
查看次数

芹菜花安全生产

我希望使用Flower(https://github.com/mher/flower)来监控我的Celery任务,而不是django-admin在他们的文档中建议(http://docs.celeryproject.org/en/latest) /userguide/monitoring.html#flower-real-time-celery-web-monitor).但是,因为我是新手,所以我对Flower的页面仅基于HTTP而不是HTTPS的方式感到有些困惑.如何为我的Celery任务启用安全性,以便任何旧用户都无法访问无需登录的网站http://flowerserver.com:5555并进行更改?

我已经考虑了Celery 自己的文档,但遗憾的是,没有提到如何保护Flower的api或web ui.它说:[Need more text here]

谢谢!

更新:我的问题部分与此处重复:如何向Django Celery Flower Monitoring添加身份验证和端点?

但是,我在这里通过询问如何在同一个远程机器上使用包含nginx,gunicorn和celery的环境来运行它来澄清他的问题.我也想知道如何设置Flower的外部可访问网址,但是如果可能的话,也更喜欢https而不是http(或某种方式保护webui并远程访问它).我还需要知道,对于任何可能获得Flower内部API访问权限的人来说,离开Flower运行是否存在相当大的安全风险,以及确保实现这一目标的最佳方式是什么,或者它是否应该完全禁用并仅用于as-需要的基础.

django nginx celery flower

32
推荐指数
3
解决办法
1万
查看次数

使用node.js创建Celery worker

使用node-celery,我们可以启用节点将Celery作业推送到任务队列.我们如何允许节点成为Celery工作者并使用队列?

javascript celery node.js node-celery

32
推荐指数
1
解决办法
4089
查看次数

如何使用@shared_task装饰器进行基于类的任务

文档所示,@shared_task装饰器允许您创建任务而无需任何具体的应用程序实例.给出的示例显示了如何装饰基于函数的任务.

如何装饰基于类的任务?

python celery

32
推荐指数
2
解决办法
2万
查看次数

基于Python的异步工作流模块:celery工作流和luigi工作流有什么区别?

我使用django作为Web框架.我需要一个工作流引擎,它可以执行同步和异步(批处理任务)任务链.我发现芹菜和luigi作为批处理工作流程.我的第一个问题是这两个模块之间的区别是什么.

Luigi允许我们重新运行失败的任务链,只有失败的子任务才能重新执行.那么芹菜:如果我们重新运行链(在修复失败的子任务代码之后),它会重新运行已经成功的子任务吗?

假设我有两个子任务.第一个创建一些文件,第二个读取这些文件.当我把它们放入芹菜链中时,整个链条由于第二个任务中的错误代码而失败.在第二个任务中修复代码后重新运行链时会发生什么?第一个任务是否会尝试重新创建这些文件?

python celery luigi

32
推荐指数
2
解决办法
6710
查看次数

为什么Celery不会在底层任务抛出异常时抛出异常

Celery似乎没有正确处理异常.

如果我有任务:

def errorTest():
    raise Exception()
Run Code Online (Sandbox Code Playgroud)

然后我打电话

r = errorTest.delay()
In [8]: r.result

In [9]: r.state
Out[9]: 'PENDING'
Run Code Online (Sandbox Code Playgroud)

它会无限期地挂起来.

去检查日志显示错误被抛出任务(如果你想要消息,请问),我知道后端和一切都设置正确,因为其他任务正常工作并正确返回结果.

在Celery中捕捉异常需要做些什么时髦吗?

/ Celery版本是3.0.13,经纪人是RabbitMQ在我的本地机器上运行

python exception-handling celery

31
推荐指数
2
解决办法
2万
查看次数

使用mock在Django单元测试中修补芹菜任务

我正在尝试使用python模拟库来修补在我的django应用程序中保存模型时运行的Celery任务,以查看它是否被正确调用.

基本上,任务在内部定义myapp.tasks,并在我的models.py文件的顶部导入,如下所示:

from .tasks import mytask

...然后save()使用在模型内部运行mytask.delay(foo, bar).到目前为止一切都很好 - 当我实际运行Celeryd等时工作得很好.

我想构建一个模拟任务的单元测试,只是为了检查它是否使用正确的参数调用,并且实际上并没有尝试运行Celery任务.

所以在测试文件中,我在标准的TestCase中有这样的东西:

from mock import patch # at the top of the file

# ...then later
def test_celery_task(self):
    with patch('myapp.models.mytask.delay') as mock_task:
        # ...create an instance of the model and save it etc
        self.assertTrue(mock_task.called)
Run Code Online (Sandbox Code Playgroud)

...但它永远不会被调用/总是错误的.我已经尝试了各种化身(myapp.models.mytask改为修补,并检查是否mock_task.delay被调用.我从模拟文档中收集了导入路径至关重要,谷歌搜索告诉我它应该是在模块中看到的路径测试(如果我理解正确的话,那将是myapp.models.mytask.delay而不是myapp.tasks.mytask.delay).

我在哪里错了?在修补Celery任务时是否存在一些特定的困难?我可以修补celery.task(用作装饰器mytask)吗?

python django unit-testing mocking celery

31
推荐指数
2
解决办法
1万
查看次数