我有一个由三台机器组成的集群。我想celery beat
在这些上奔跑。我有几个相关问题。
CELERYBEAT_SCHEDULE
,我是否需要保留它?djcelery.schedulers.DatabaseScheduler
自动处理并发节拍守护进程?也就是说,如果我只使用 运行三个节拍守护进程DatabaseScheduler
,我是否可以免受重复任务的影响?DatabaseScheduler
但基于 MongoDB 的东西,没有 Django ORM?像 Celery\xe2\x80\x99s 一样拥有 MongoDB 代理和结果后端。我在使用 gevent 运行 Celery 时遇到性能负载问题,所有内容都在我的 VPS 上的同一核心上运行。
这是 4 个 Celery 实例的屏幕截图,每个实例有 20 个 gevent 并发
如何解决这个问题?我究竟做错了什么 ?
这是我的第一个任务:
def update_sender():
items = models.Item.objects.filter(active=True).all()
count = items.count()
items = [i.id for i in items]
step = count / settings.WORKERS
for job in list(chunks(items, step)):
update_item.apply_async(args=[job])
Run Code Online (Sandbox Code Playgroud)
调用以下子任务:
def update_item(items):
for item in items:
try:
i = models.Item.objects.get(id=item)
url = "someurl"
rep = requests.get(url)
jrep = rep.json()
tracker = ItemTracker(i, jrep)
if tracker.skip():
continue
if tracker.method1():
if not tracker.method2():
tracker.method3()
tracker.save()
Run Code Online (Sandbox Code Playgroud)
这都是关于同时执行大量 …
我想在我的代码中逐步构建一组Celery 任务,因为我将在循环中根据逻辑创建任务。
例如:
my_group = group()
for item in items:
if item.is_special():
# This doesn't work...
my_group.add(special_processing.s(item.id))
else:
my_group.add(regular_processing.s(item.id))
res = my_group()
Run Code Online (Sandbox Code Playgroud)
我读过组是部分,这很好,但是您如何组合部分以便它们形成一个组?
我如何告诉 celery 我在 redis 上寻找的服务名称?我正在尝试使用 Celery 4 中的内置 Sentinel 支持。我正在传递一个按文档中所述配置的代理 URL :sentinel://0.0.0.0:26379
但是 redis 似乎抱怨没有通过 service_name:
File "/usr/local/lib/python2.7/dist-packages/redis/sentinel.py", line 222, in discover_master
raise MasterNotFoundError("No master found for %r" % (service_name,))
OperationalError: No master found for None
Run Code Online (Sandbox Code Playgroud)
是否可以使用此 URL 格式传递 service_name ?我试过了
sentinel://0.0.0.0:26379/my_service
sentinel://0.0.0.0:26379/0/my_service
Run Code Online (Sandbox Code Playgroud)
我找不到任何关于连接 URL 的文档——我找到了redis-sentinel-URL,但我没有看到它包含在 redis 包中,所以我什至不确定它是否被 redis 使用。
我有几个关于任务路由、并发性和性能的问题。这是我的用例:
我有一台专用服务器来运行 celery 任务,因此我可以使用所有 CPU 在该服务器上运行 celery 工作程序。
我有很多不同的 python 任务,我使用:CELERY_ROUTES 进行路由,因为这些任务执行真正不同类型的 python 代码,我创建了 5 个不同的 worker。这些工作器是在我使用 ansible 部署我的项目时创建的,这是一个示例:
[program:default_queue-celery]
command={{ venv_dir }}/bin/celery worker --app=django_coreapp --loglevel=INFO --concurrency=1 --autoscale=15,10 --queues=default_queue
environment =
SERVER_TYPE="{{ SERVER_TYPE }}",
DB_SCHEMA="{{ DB_SCHEMA }}",
DB_USER="{{ DB_USER }}",
DB_PASS="{{ DB_PASS }}",
DB_HOST="{{ DB_HOST }}"
directory={{ git_dir }}
user={{ user }}
group={{ group }}
stdout_logfile={{ log_dir }}/default_queue.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=5
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
Run Code Online (Sandbox Code Playgroud)
我在 settings.py 中还有一个 CELERY_QUEUES 来在 CELERY_ROUTES 和我的芹菜程序(队列)之间架起桥梁
CELERY_DEFAULT_QUEUE = 'default_queue'
Run Code Online (Sandbox Code Playgroud)
如果碰巧我没有路由任务,它将转到我的“default_queue”
为了给我的所有队列留出空间,我将 default_queue 的 --concurrency …
我们正在从视频中构建用于人脸识别的脚本,主要使用 tensorflow 来实现基本的识别功能。
当我们直接使用 a python test-reco.py
(以视频路径作为参数)尝试 soft 时,它完美地工作。
现在我们正在尝试通过我们的网站将它集成到 celery 任务中。
下面是主要代码:
def extract_labels(self, path_to_video):
if not os.path.exists(path_to_video):
print("NO VIDEO!")
return None
video = VideoFileClip(path_to_video)
n_frames = int(video.fps * video.duration)
out = []
for i, frame in enumerate(video.iter_frames()):
if self.verbose > 0:
print(
'processing frame:',
str(i).zfill(len(str(n_frames))),
'/',
n_frames
)
try:
rect = face_detector(frame[::2, ::2], 0)[0]
y0, x0, y1, x1 = np.array([rect.left(), rect.top(), rect.right(), rect.bottom()])*2
bbox = frame[x0:x1, y0:y1]
bbox = resize(bbox, [128, 128])
bbox = rgb2gray(bbox)
bbox = …
Run Code Online (Sandbox Code Playgroud) 我正在使用应用程序工厂模式创建 Flask 服务,并且需要使用 celery 来执行异步任务。我还使用 docker 和 docker-compose 来包含和运行所有内容。我的结构如下所示:
server
|
+-- manage.py
+-- docker-compose.yml
+-- requirements.txt
+-- Dockerfile
|
+-- project
| |
| +-- api
| |
| +--tasks.py
|
| +-- __init__.py
Run Code Online (Sandbox Code Playgroud)
我的tasks.py
文件如下所示:
from project import celery_app
@celery_app.task
def celery_check(test):
print(test)
Run Code Online (Sandbox Code Playgroud)
我调用manage.py
run ,如下所示:
# manage.py
from flask_script import Manager
from project import create_app
app = create_app()
manager = Manager(app)
if __name__ == '__main__':
manager.run()
Run Code Online (Sandbox Code Playgroud)
我的__init__.py
看起来像这样:
# project/__init__.py
import os
import json …
Run Code Online (Sandbox Code Playgroud) 描述:
我想要一个缓存值(让我们称之为 a flag
)来知道 celery 任务何时完成执行。我有一个视图让前端轮询这个标志,直到它变成False
.
代码:
settings.py
:
...
MEMCACHED_URL = os.getenv('MEMCACHED_URL', None) # Cache of devel or production
if MEMCACHED_URL:
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
'LOCATION': MEMCACHED_URL,
}
}
else:
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'unique-snowflake',
}
}
Run Code Online (Sandbox Code Playgroud)api/views.py
:
def a_view(request):
# Do some stuff
cache.add(generated_flag_key, True)
tasks.my_celery_task.apply_async([argument_1, ..., generated_flag_key])
# Checking here with cache.get(generated_flag_key), the value is True.
# Do other stuff.
Run Code Online (Sandbox Code Playgroud)tasks.py
:
@shared_task
def my_celery_task(argument_1, ..., …
Run Code Online (Sandbox Code Playgroud)在这里,我有一个视图CrawlerHomeView
,用于从表单创建任务对象,现在我想用 celery 定期安排此任务。
我想CrawlerHomeView
用任务对象 search_frequency 并通过检查一些任务对象字段来安排这个过程。
任务模型
class Task(models.Model):
INITIAL = 0
STARTED = 1
COMPLETED = 2
task_status = (
(INITIAL, 'running'),
(STARTED, 'running'),
(COMPLETED, 'completed'),
(ERROR, 'error')
)
FREQUENCY = (
('1', '1 hrs'),
('2', '2 hrs'),
('6', '6 hrs'),
('8', '8 hrs'),
('10', '10 hrs'),
)
name = models.CharField(max_length=255)
scraping_end_date = models.DateField(null=True, blank=True)
search_frequency = models.CharField(max_length=5, null=True, blank=True, choices=FREQUENCY)
status = models.IntegerField(choices=task_status)
Run Code Online (Sandbox Code Playgroud)
任务.py
如果任务状态为 0 或 1 且未超过任务抓取结束日期,我想定期运行下面发布的视图 [期间 =(任务的搜索频率时间])。但我卡在这里了。我该怎么做?
@periodic_task(run_every=crontab(hour="task.search_frequency")) # how …
Run Code Online (Sandbox Code Playgroud) 在这里我使用的apply_async
方法与countdown
和expires
参数一些倒计时后执行任务,并在到期日期时间的某些任务。
但我收到了这个错误
Django Version: 3.0.6
Exception Type: TypeError
Exception Value: schedule_task() argument after * must be an iterable, not int
Run Code Online (Sandbox Code Playgroud)
如何解决这个错误?
任务
@periodic_task(run_every=crontab(minute=1), ignore_result=False)
def schedule_task(pk):
task = Task.objects.get(pk=pk)
unique_id = str(uuid4())
Run Code Online (Sandbox Code Playgroud)
意见
form = CreateTaskForm(request.POST)
if form.is_valid():
unique_id = str(uuid4())
obj = form.save(commit=False)
obj.created_by = request.user
obj.unique_id = unique_id
obj.status = 0
obj.save()
form.save_m2m()
# schedule_task.delay(obj.pk)
schedule_task.apply_async((obj.pk),expires=datetime.datetime.now() + datetime.timedelta(minutes=5), countdown=int(obj.search_frequency))
return redirect('crawler:task-list')
Run Code Online (Sandbox Code Playgroud) celery ×10
django ×5
python ×5
concurrency ×2
django-views ×2
celerybeat ×1
django-cache ×1
docker ×1
flask ×1
gevent ×1
keras ×1
mongodb ×1
redis ×1
tensorflow ×1
web-scraping ×1