标签: celery

如何协调 celerybeat 守护进程集群?

我有一个由三台机器组成的集群。我想celery beat在这些上奔跑。我有几个相关问题。

\n\n
    \n
  1. Celery 有持久调度程序的概念。只要我的日程安排仅包含 crontab 条目并且由 静态定义CELERYBEAT_SCHEDULE,我是否需要保留它?
  2. \n
  3. 如果这样做,那么我是否必须确保该存储在集群的所有机器之间同步?
  4. \n
  5. 是否djcelery.schedulers.DatabaseScheduler自动处理并发节拍守护进程?也就是说,如果我只使用 运行三个节拍守护进程DatabaseScheduler,我是否可以免受重复任务的影响?
  6. \n
  7. 有没有类似DatabaseScheduler但基于 MongoDB 的东西,没有 Django ORM?像 Celery\xe2\x80\x99s 一样拥有 MongoDB 代理和结果后端。
  8. \n
\n

concurrency mongodb celery celerybeat

1
推荐指数
1
解决办法
1542
查看次数

Celery + gevent 仅使用一个CPU核心

我在使用 gevent 运行 Celery 时遇到性能负载问题,所有内容都在我的 VPS 上的同一核心上运行。

这是 4 个 Celery 实例的屏幕截图,每个实例有 20 个 gevent 并发。

如何解决这个问题?我究竟做错了什么 ?

这是我的第一个任务:

def update_sender():
    items = models.Item.objects.filter(active=True).all()
    count = items.count()
    items = [i.id for i in items]
    step = count / settings.WORKERS
    for job in list(chunks(items, step)):
        update_item.apply_async(args=[job])
Run Code Online (Sandbox Code Playgroud)

调用以下子任务:

def update_item(items):
    for item in items:
        try:
            i = models.Item.objects.get(id=item)
            url = "someurl"
            rep = requests.get(url)
            jrep = rep.json()
            tracker = ItemTracker(i, jrep)
            if tracker.skip():
                continue
            if tracker.method1():
                if not tracker.method2():
                    tracker.method3()
                tracker.save()
Run Code Online (Sandbox Code Playgroud)

这都是关于同时执行大量 …

python celery gevent

1
推荐指数
1
解决办法
2119
查看次数

如何在 Celery 中将任务添加到组中?

我想在我的代码中逐步构建一Celery 任务,因为我将在循环中根据逻辑创建任务。

例如:

my_group = group()
for item in items:
    if item.is_special():
        # This doesn't work...
        my_group.add(special_processing.s(item.id))
    else:
        my_group.add(regular_processing.s(item.id))

 res = my_group()
Run Code Online (Sandbox Code Playgroud)

我读过组是部分,这很好,但是您如何组合部分以便它们形成一个组?

python distributed-computing celery

1
推荐指数
1
解决办法
2321
查看次数

通过 Sentinel 将 Celery 连接到 Redis

我如何告诉 celery 我在 redis 上寻找的服务名称?我正在尝试使用 Celery 4 中的内置 Sentinel 支持。我正在传递一个按文档所述配置的代理 URL :sentinel://0.0.0.0:26379

但是 redis 似乎抱怨没有通过 service_name:

File "/usr/local/lib/python2.7/dist-packages/redis/sentinel.py", line 222, in discover_master
    raise MasterNotFoundError("No master found for %r" % (service_name,))
OperationalError: No master found for None
Run Code Online (Sandbox Code Playgroud)

是否可以使用此 URL 格式传递 service_name ?我试过了

sentinel://0.0.0.0:26379/my_service
sentinel://0.0.0.0:26379/0/my_service
Run Code Online (Sandbox Code Playgroud)

我找不到任何关于连接 URL 的文档——我找到了redis-sentinel-URL,但我没有看到它包含在 redis 包中,所以我什至不确定它是否被 redis 使用。

redis celery redis-sentinel

1
推荐指数
1
解决办法
3037
查看次数

CELERYD_CONCURRENCY, --concurrency 和 autoscale

我有几个关于任务路由、并发性和性能的问题。这是我的用例:

我有一台专用服务器来运行 celery 任务,因此我可以使用所有 CPU 在该服务器上运行 celery 工作程序。

我有很多不同的 python 任务,我使用:CELERY_ROUTES 进行路由,因为这些任务执行真正不同类型的 python 代码,我创建了 5 个不同的 worker。这些工作器是在我使用 ansible 部署我的项目时创建的,这是一个示例:

[program:default_queue-celery]
command={{ venv_dir }}/bin/celery worker --app=django_coreapp --loglevel=INFO --concurrency=1 --autoscale=15,10 --queues=default_queue
environment =
    SERVER_TYPE="{{ SERVER_TYPE }}",
    DB_SCHEMA="{{ DB_SCHEMA }}",
    DB_USER="{{ DB_USER }}",
    DB_PASS="{{ DB_PASS }}",
    DB_HOST="{{ DB_HOST }}"
directory={{ git_dir }}
user={{ user }}
group={{ group }}
stdout_logfile={{ log_dir }}/default_queue.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=5
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true 
Run Code Online (Sandbox Code Playgroud)

我在 settings.py 中还有一个 CELERY_QUEUES 来在 CELERY_ROUTES 和我的芹菜程序(队列)之间架起桥梁

CELERY_DEFAULT_QUEUE = 'default_queue'
Run Code Online (Sandbox Code Playgroud)

如果碰巧我没有路由任务,它将转到我的“default_queue”

为了给我的所有队列留出空间,我将 default_queue 的 --concurrency …

python django concurrency multithreading celery

1
推荐指数
1
解决办法
3250
查看次数

带有 django 的 Tensorflow/Keras 与 celery 无法正常工作

我们正在从视频中构建用于人脸识别的脚本,主要使用 tensorflow 来实现基本的识别功能。

当我们直接使用 a python test-reco.py(以视频路径作为参数)尝试 soft 时,它完美地工作。

现在我们正在尝试通过我们的网站将它集成到 celery 任务中。

下面是主要代码:

def extract_labels(self, path_to_video):
    if not os.path.exists(path_to_video):
        print("NO VIDEO!")
        return None
    video = VideoFileClip(path_to_video)
    n_frames = int(video.fps * video.duration)

    out = []
    for i, frame in enumerate(video.iter_frames()):
        if self.verbose > 0:
            print(
                'processing frame:',
                str(i).zfill(len(str(n_frames))),
                '/',
                n_frames
            )

        try:
            rect = face_detector(frame[::2, ::2], 0)[0]
            y0, x0, y1, x1 = np.array([rect.left(), rect.top(), rect.right(), rect.bottom()])*2
            bbox = frame[x0:x1, y0:y1]
            bbox = resize(bbox, [128, 128])
            bbox = rgb2gray(bbox)
            bbox = …
Run Code Online (Sandbox Code Playgroud)

django celery keras tensorflow

1
推荐指数
1
解决办法
1617
查看次数

docker 中的 Celery 工作人员将无法获得正确的消息代理

我正在使用应用程序工厂模式创建 Flask 服务,并且需要使用 celery 来执行异步任务。我还使用 docker 和 docker-compose 来包含和运行所有内容。我的结构如下所示:

server
 |
 +-- manage.py
 +-- docker-compose.yml
 +-- requirements.txt
 +-- Dockerfile
 |    
 +-- project
 |  |  
 |  +-- api
 |      |
 |      +--tasks.py
 |
 |  +-- __init__.py
Run Code Online (Sandbox Code Playgroud)

我的tasks.py文件如下所示:

from project import celery_app

@celery_app.task
def celery_check(test):
    print(test)
Run Code Online (Sandbox Code Playgroud)

我调用manage.pyrun ,如下所示:

# manage.py

from flask_script import Manager
from project import create_app

app = create_app()
manager = Manager(app)

if __name__ == '__main__':
    manager.run()
Run Code Online (Sandbox Code Playgroud)

我的__init__.py看起来像这样:

# project/__init__.py

import os
import json …
Run Code Online (Sandbox Code Playgroud)

python celery flask docker docker-compose

1
推荐指数
1
解决办法
1475
查看次数

似乎无法从芹菜任务修改缓存值

描述:

我想要一个缓存值(让我们称之为 a flag)来知道 celery 任务何时完成执行。我有一个视图让前端轮询这个标志,直到它变成False.

代码:

  • settings.py

    ...
    MEMCACHED_URL = os.getenv('MEMCACHED_URL', None) # Cache of devel or production
    if MEMCACHED_URL:
        CACHES = {
            'default': {
                'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
                'LOCATION': MEMCACHED_URL,
             }
        }
    else:
        CACHES = {
            'default': {
                'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
                'LOCATION': 'unique-snowflake',
            }
        }
    
    Run Code Online (Sandbox Code Playgroud)
  • api/views.py

    def a_view(request):
        # Do some stuff
        cache.add(generated_flag_key, True)
        tasks.my_celery_task.apply_async([argument_1, ..., generated_flag_key])
        # Checking here with cache.get(generated_flag_key), the value is True.
        # Do other stuff.
    
    Run Code Online (Sandbox Code Playgroud)
  • tasks.py

    @shared_task
    def my_celery_task(argument_1, ..., …
    Run Code Online (Sandbox Code Playgroud)

python django django-cache celery django-rest-framework

1
推荐指数
1
解决办法
1385
查看次数

如何使用celery定期在django中安排我的爬虫功能?

在这里,我有一个视图CrawlerHomeView,用于从表单创建任务对象,现在我想用 celery 定期安排此任务。

我想CrawlerHomeView用任务对象 search_frequency 并通过检查一些任务对象字段来安排这个过程。

任务模型

class Task(models.Model):
    INITIAL = 0
    STARTED = 1
    COMPLETED = 2

    task_status = (
        (INITIAL, 'running'),
        (STARTED, 'running'),
        (COMPLETED, 'completed'),
        (ERROR, 'error')
    )

    FREQUENCY = (
        ('1', '1 hrs'),
        ('2', '2 hrs'),
        ('6', '6 hrs'),
        ('8', '8 hrs'),
        ('10', '10 hrs'),
    )

    name = models.CharField(max_length=255)
    scraping_end_date = models.DateField(null=True, blank=True)
    search_frequency = models.CharField(max_length=5, null=True, blank=True, choices=FREQUENCY)
    status = models.IntegerField(choices=task_status)
Run Code Online (Sandbox Code Playgroud)

任务.py

如果任务状态为 0 或 1 且未超过任务抓取结束日期,我想定期运行下面发布的视图 [期间 =(任务的搜索频率时间])。但我卡在这里了。我该怎么做?

@periodic_task(run_every=crontab(hour="task.search_frequency"))  # how …
Run Code Online (Sandbox Code Playgroud)

django django-views celery web-scraping

1
推荐指数
1
解决办法
938
查看次数

* 后的 schedule_task() 参数必须是可迭代的,而不是 int

在这里我使用的apply_async方法与countdownexpires参数一些倒计时后执行任务,并在到期日期时间的某些任务。

但我收到了这个错误

Django Version: 3.0.6 
Exception Type: TypeError
 Exception Value:    schedule_task() argument after * must be an iterable, not int
Run Code Online (Sandbox Code Playgroud)

如何解决这个错误?

任务

@periodic_task(run_every=crontab(minute=1), ignore_result=False)  
def schedule_task(pk):

    task = Task.objects.get(pk=pk)
    unique_id = str(uuid4()) 
Run Code Online (Sandbox Code Playgroud)

意见

form = CreateTaskForm(request.POST)
        if form.is_valid():

            unique_id = str(uuid4())
            obj = form.save(commit=False)           
            obj.created_by = request.user
            obj.unique_id = unique_id
            obj.status = 0
            obj.save()
            form.save_m2m()
            # schedule_task.delay(obj.pk)
            schedule_task.apply_async((obj.pk),expires=datetime.datetime.now() + datetime.timedelta(minutes=5), countdown=int(obj.search_frequency))

            return redirect('crawler:task-list')
Run Code Online (Sandbox Code Playgroud)

django django-views celery

1
推荐指数
1
解决办法
666
查看次数