如何使用celery定期在django中安排我的爬虫功能?

D_P*_*D_P 1 django django-views celery web-scraping

在这里,我有一个视图CrawlerHomeView,用于从表单创建任务对象,现在我想用 celery 定期安排此任务。

我想CrawlerHomeView用任务对象 search_frequency 并通过检查一些任务对象字段来安排这个过程。

任务模型

class Task(models.Model):
    INITIAL = 0
    STARTED = 1
    COMPLETED = 2

    task_status = (
        (INITIAL, 'running'),
        (STARTED, 'running'),
        (COMPLETED, 'completed'),
        (ERROR, 'error')
    )

    FREQUENCY = (
        ('1', '1 hrs'),
        ('2', '2 hrs'),
        ('6', '6 hrs'),
        ('8', '8 hrs'),
        ('10', '10 hrs'),
    )

    name = models.CharField(max_length=255)
    scraping_end_date = models.DateField(null=True, blank=True)
    search_frequency = models.CharField(max_length=5, null=True, blank=True, choices=FREQUENCY)
    status = models.IntegerField(choices=task_status)
Run Code Online (Sandbox Code Playgroud)

任务.py

如果任务状态为 0 或 1 且未超过任务抓取结束日期,我想定期运行下面发布的视图 [期间 =(任务的搜索频率时间])。但我卡在这里了。我该怎么做?

@periodic_task(run_every=crontab(hour="task.search_frequency"))  # how to do  with task search_frequency value
def schedule_task(pk):
    task = Task.objects.get(pk=pk)
    if task.status == 0 or task.status == 1 and not datetime.date.today() > task.scraping_end_date:

        # perform the crawl function ---> def crawl() how ??

        if task.scraping_end_date == datetime.date.today():
            task.status = 2
            task.save()  # change the task status as complete.
Run Code Online (Sandbox Code Playgroud)

视图.py

我想定期运行此视图。我该怎么做?

class CrawlerHomeView(LoginRequiredMixin, View):
    login_url = 'users:login'

    def get(self, request, *args, **kwargs):
        # all_task = Task.objects.all().order_by('-id')
        frequency = Task()
        categories = Category.objects.all()
        targets = TargetSite.objects.all()
        keywords = Keyword.objects.all()

        form = CreateTaskForm()
        context = {
            'targets': targets,
            'keywords': keywords,
            'frequency': frequency,
            'form':form,
            'categories': categories,
        }
        return render(request, 'index.html', context)

    def post(self, request, *args, **kwargs):

        form = CreateTaskForm(request.POST)
        if form.is_valid():

            # try:
            unique_id = str(uuid4()) # create a unique ID. 
            obj = form.save(commit=False)

            # obj.keywords = keywords
            obj.created_by = request.user
            obj.unique_id = unique_id
            obj.status = 0
            obj.save()
            form.save_m2m()

            keywords = ''
            # for keys in ast.literal_eval(obj.keywords.all()): #keywords change to csv
            for keys in obj.keywords.all():
                if keywords:
                    keywords += ', ' + keys.title
                else:
                    keywords += keys.title
            # tasks = request.POST.get('targets')
            # targets = ['thehimalayantimes', 'kathmandupost']
            # print('$$$$$$$$$$$$$$$ keywords', keywords)

            task_ids = [] #one Task/Project contains one or multiple scrapy task

            settings = {
                'spider_count' : len(obj.targets.all()),
                'keywords' : keywords,
                'unique_id': unique_id, # unique ID for each record for DB
                'USER_AGENT': 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)'
            }

            # res = ast.literal_eval(ini_list) 

            for site_url in obj.targets.all():
                domain = urlparse(site_url.address).netloc # parse the url and extract the domain
                spider_name = domain.replace('.com', '')
                task = scrapyd.schedule('default', spider_name, settings=settings, url=site_url.address, domain=domain, keywords=keywords)

            # task = scrapyd.schedule('default', spider_name , settings=settings, url=obj.targets, domain=domain, keywords=obj.keywords)
            return redirect('crawler:task-list')
            # except:
            #     return render(request, 'index.html', {'form':form})
        return render(request, 'index.html', {'form':form, 'errors':form.errors})
Run Code Online (Sandbox Code Playgroud)

这个问题有什么建议或答案吗?

ACi*_*der 7

在以 15k 任务/秒的设置与 Celery 抗争 5 年之后,我强烈建议您切换到Dramatiq,它具有健全、可靠、高性能的代码库,不会拆分到多个复杂的包中,并且在我的两个新项目中完美运行迄今为止。

作者的动机

我已经专业地使用 Celery 多年,我对它越来越感到沮丧是我开发dramatiq 的原因之一。以下是 Dramatiq、Celery 和 RQ 之间的一些主要区别:

还有一个 Django 帮助包:https : //github.com/Bogdanp/django_dramatiq

当然,你不会有一个内置的 celerybeat,但是调用 python 任务的 cron 更健壮,我们丢失了大量数据,因为 celerybeat 决定定期停止:)


有两个项目旨在添加周期性任务创建:https : //gitlab.com/bersace/periodiqhttps://apscheduler.readthedocs.io/en/stable/

我还没有使用过这些包,您可以尝试使用 periodiq 选择您的数据库条目,遍历这些条目并为每个条目定义一个周期任务(但这需要定期重启 periodiq 工作人员以获取更改):

# tasks.py
from dramatiq import get_broker
from periodiq import PeriodiqMiddleware, cron

broker = get_broker()
broker.add_middleware(PeriodiqMiddleware(skip_delay=30))


for obj in Task.objects.all():
   @dramatiq.actor(periodic=cron(obj.frequency))
   def hourly(obj=obj):
       # import logic based on obj.name
       # Do something each hour…
Run Code Online (Sandbox Code Playgroud)