我有一个Django站点,当用户请求它时会发生刮擦,我的代码在新进程中启动Scrapy蜘蛛独立脚本.当然,这不会增加用户.
像这样的东西:
class StandAloneSpider(Spider):
#a regular spider
settings.overrides['LOG_ENABLED'] = True
#more settings can be changed...
crawler = CrawlerProcess( settings )
crawler.install()
crawler.configure()
spider = StandAloneSpider()
crawler.crawl( spider )
crawler.start()
Run Code Online (Sandbox Code Playgroud)
我决定使用Celery并使用worker来排队爬网请求.
但是,我遇到了Tornado反应堆无法重启的问题.第一个和第二个蜘蛛成功运行,但后续的蜘蛛会抛出ReactorNotRestartable错误.
任何人都可以在Celery框架内分享运行蜘蛛的任何提示?
将芹菜文件表明,这是一个坏主意,有任务等待的其他任务的结果.但建议的解决方案(见"好"标题)离开是可喜爱的东西.具体来说,没有明确的方法将子任务的结果返回给调用者(同样,它有点难看).
那么,有没有"链接"工作的方式,所以调用者得到最终工作的结果?例如,要使用add示例:
>>> add3 = add.subtask(args=(3, ))
>>> add.delay(1, 2, callback=add3).get()
6
Run Code Online (Sandbox Code Playgroud)
或者,返回Result的实例是否可以?例如:
@task
def add(x, y, callback=None):
result = x + y
if callback:
return subtask(callback).delay(result)
return result
Run Code Online (Sandbox Code Playgroud)
这将使链中"最终"工作的结果可以通过简单的方式重新获得:
result = add(1, 2, callback=add3).delay()
while isinstance(result, Result):
result = result.get()
print "result:", result
Run Code Online (Sandbox Code Playgroud) 默认情况下,Celery将所有任务发送到"celery"队列,但您可以通过添加额外参数来更改此行为:
@task(queue='celery_periodic')
def recalc_last_hour():
log.debug('sending new task')
recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example
Run Code Online (Sandbox Code Playgroud)
调度程序设置:
CELERYBEAT_SCHEDULE = {
'installer_recalc_hour': {
'task': 'stats.installer.tasks.recalc_last_hour',
'schedule': 15 # every 15 sec for test
},
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
Run Code Online (Sandbox Code Playgroud)
跑步工人:
python manage.py celery worker -c 1 -Q celery_periodic -B -E
Run Code Online (Sandbox Code Playgroud)
此方案无法按预期工作:此工作人员将定期任务发送到"芹菜"队列,而不是"celery_periodic".我该如何解决这个问题?
PS芹菜== 3.0.16
我正在尝试建立Django-Celery.我正在阅读教程
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
当我运行$ python manage.py celery worker --lvelvel = info时
我明白了
[Tasks]
/Users/msmith/Documents/dj/venv/lib/python2.7/site-packages/djcelery/loaders.py:133: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2013-08-08 11:15:25,368: WARNING/MainProcess] /Users/msmith/Documents/dj/venv/lib/python2.7/site-packages/djcelery/loaders.py:133: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2013-08-08 11:15:25,369: WARNING/MainProcess] celery@sfo-mpmgr ready.
[2013-08-08 11:15:25,382: ERROR/MainProcess] consumer: Cannot connect to amqp://guest@127.0.0.1:5672/celeryvhost: [Errno 61] …Run Code Online (Sandbox Code Playgroud) 我希望使用Flower(https://github.com/mher/flower)来监控我的Celery任务,而不是django-admin在他们的文档中建议(http://docs.celeryproject.org/en/latest) /userguide/monitoring.html#flower-real-time-celery-web-monitor).但是,因为我是新手,所以我对Flower的页面仅基于HTTP而不是HTTPS的方式感到有些困惑.如何为我的Celery任务启用安全性,以便任何旧用户都无法访问无需登录的网站http://flowerserver.com:5555并进行更改?
我已经考虑了Celery 自己的文档,但遗憾的是,没有提到如何保护Flower的api或web ui.它说:[Need more text here]
谢谢!
更新:我的问题部分与此处重复:如何向Django Celery Flower Monitoring添加身份验证和端点?
但是,我在这里通过询问如何在同一个远程机器上使用包含nginx,gunicorn和celery的环境来运行它来澄清他的问题.我也想知道如何设置Flower的外部可访问网址,但是如果可能的话,也更喜欢https而不是http(或某种方式保护webui并远程访问它).我还需要知道,对于任何可能获得Flower内部API访问权限的人来说,离开Flower运行是否存在相当大的安全风险,以及确保实现这一目标的最佳方式是什么,或者它是否应该完全禁用并仅用于as-需要的基础.
使用node-celery,我们可以启用节点将Celery作业推送到任务队列.我们如何允许节点成为Celery工作者并使用队列?
如文档所示,@shared_task装饰器允许您创建任务而无需任何具体的应用程序实例.给出的示例显示了如何装饰基于函数的任务.
如何装饰基于类的任务?
我使用django作为Web框架.我需要一个工作流引擎,它可以执行同步和异步(批处理任务)任务链.我发现芹菜和luigi作为批处理工作流程.我的第一个问题是这两个模块之间的区别是什么.
Luigi允许我们重新运行失败的任务链,只有失败的子任务才能重新执行.那么芹菜:如果我们重新运行链(在修复失败的子任务代码之后),它会重新运行已经成功的子任务吗?
假设我有两个子任务.第一个创建一些文件,第二个读取这些文件.当我把它们放入芹菜链中时,整个链条由于第二个任务中的错误代码而失败.在第二个任务中修复代码后重新运行链时会发生什么?第一个任务是否会尝试重新创建这些文件?
Celery似乎没有正确处理异常.
如果我有任务:
def errorTest():
raise Exception()
Run Code Online (Sandbox Code Playgroud)
然后我打电话
r = errorTest.delay()
In [8]: r.result
In [9]: r.state
Out[9]: 'PENDING'
Run Code Online (Sandbox Code Playgroud)
它会无限期地挂起来.
去检查日志显示错误被抛出任务(如果你想要消息,请问),我知道后端和一切都设置正确,因为其他任务正常工作并正确返回结果.
在Celery中捕捉异常需要做些什么时髦吗?
/ Celery版本是3.0.13,经纪人是RabbitMQ在我的本地机器上运行
我正在尝试使用python模拟库来修补在我的django应用程序中保存模型时运行的Celery任务,以查看它是否被正确调用.
基本上,任务在内部定义myapp.tasks,并在我的models.py文件的顶部导入,如下所示:
from .tasks import mytask
...然后save()使用在模型内部运行mytask.delay(foo, bar).到目前为止一切都很好 - 当我实际运行Celeryd等时工作得很好.
我想构建一个模拟任务的单元测试,只是为了检查它是否使用正确的参数调用,并且实际上并没有尝试运行Celery任务.
所以在测试文件中,我在标准的TestCase中有这样的东西:
from mock import patch # at the top of the file
# ...then later
def test_celery_task(self):
with patch('myapp.models.mytask.delay') as mock_task:
# ...create an instance of the model and save it etc
self.assertTrue(mock_task.called)
Run Code Online (Sandbox Code Playgroud)
...但它永远不会被调用/总是错误的.我已经尝试了各种化身(myapp.models.mytask改为修补,并检查是否mock_task.delay被调用.我从模拟文档中收集了导入路径至关重要,谷歌搜索告诉我它应该是在模块中看到的路径测试(如果我理解正确的话,那将是myapp.models.mytask.delay而不是myapp.tasks.mytask.delay).
我在哪里错了?在修补Celery任务时是否存在一些特定的困难?我可以修补celery.task(用作装饰器mytask)吗?
celery ×10
python ×8
django ×4
flower ×1
javascript ×1
luigi ×1
mocking ×1
nginx ×1
node-celery ×1
node.js ×1
scrapy ×1
unit-testing ×1