D_P*_*D_P 1 django django-views celery web-scraping
在这里,我有一个视图CrawlerHomeView,用于从表单创建任务对象,现在我想用 celery 定期安排此任务。
我想CrawlerHomeView用任务对象 search_frequency 并通过检查一些任务对象字段来安排这个过程。
任务模型
class Task(models.Model):
INITIAL = 0
STARTED = 1
COMPLETED = 2
task_status = (
(INITIAL, 'running'),
(STARTED, 'running'),
(COMPLETED, 'completed'),
(ERROR, 'error')
)
FREQUENCY = (
('1', '1 hrs'),
('2', '2 hrs'),
('6', '6 hrs'),
('8', '8 hrs'),
('10', '10 hrs'),
)
name = models.CharField(max_length=255)
scraping_end_date = models.DateField(null=True, blank=True)
search_frequency = models.CharField(max_length=5, null=True, blank=True, choices=FREQUENCY)
status = models.IntegerField(choices=task_status)
Run Code Online (Sandbox Code Playgroud)
任务.py
如果任务状态为 0 或 1 且未超过任务抓取结束日期,我想定期运行下面发布的视图 [期间 =(任务的搜索频率时间])。但我卡在这里了。我该怎么做?
@periodic_task(run_every=crontab(hour="task.search_frequency")) # how to do with task search_frequency value
def schedule_task(pk):
task = Task.objects.get(pk=pk)
if task.status == 0 or task.status == 1 and not datetime.date.today() > task.scraping_end_date:
# perform the crawl function ---> def crawl() how ??
if task.scraping_end_date == datetime.date.today():
task.status = 2
task.save() # change the task status as complete.
Run Code Online (Sandbox Code Playgroud)
视图.py
我想定期运行此视图。我该怎么做?
class CrawlerHomeView(LoginRequiredMixin, View):
login_url = 'users:login'
def get(self, request, *args, **kwargs):
# all_task = Task.objects.all().order_by('-id')
frequency = Task()
categories = Category.objects.all()
targets = TargetSite.objects.all()
keywords = Keyword.objects.all()
form = CreateTaskForm()
context = {
'targets': targets,
'keywords': keywords,
'frequency': frequency,
'form':form,
'categories': categories,
}
return render(request, 'index.html', context)
def post(self, request, *args, **kwargs):
form = CreateTaskForm(request.POST)
if form.is_valid():
# try:
unique_id = str(uuid4()) # create a unique ID.
obj = form.save(commit=False)
# obj.keywords = keywords
obj.created_by = request.user
obj.unique_id = unique_id
obj.status = 0
obj.save()
form.save_m2m()
keywords = ''
# for keys in ast.literal_eval(obj.keywords.all()): #keywords change to csv
for keys in obj.keywords.all():
if keywords:
keywords += ', ' + keys.title
else:
keywords += keys.title
# tasks = request.POST.get('targets')
# targets = ['thehimalayantimes', 'kathmandupost']
# print('$$$$$$$$$$$$$$$ keywords', keywords)
task_ids = [] #one Task/Project contains one or multiple scrapy task
settings = {
'spider_count' : len(obj.targets.all()),
'keywords' : keywords,
'unique_id': unique_id, # unique ID for each record for DB
'USER_AGENT': 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)'
}
# res = ast.literal_eval(ini_list)
for site_url in obj.targets.all():
domain = urlparse(site_url.address).netloc # parse the url and extract the domain
spider_name = domain.replace('.com', '')
task = scrapyd.schedule('default', spider_name, settings=settings, url=site_url.address, domain=domain, keywords=keywords)
# task = scrapyd.schedule('default', spider_name , settings=settings, url=obj.targets, domain=domain, keywords=obj.keywords)
return redirect('crawler:task-list')
# except:
# return render(request, 'index.html', {'form':form})
return render(request, 'index.html', {'form':form, 'errors':form.errors})
Run Code Online (Sandbox Code Playgroud)
这个问题有什么建议或答案吗?
在以 15k 任务/秒的设置与 Celery 抗争 5 年之后,我强烈建议您切换到Dramatiq,它具有健全、可靠、高性能的代码库,不会拆分到多个复杂的包中,并且在我的两个新项目中完美运行迄今为止。
我已经专业地使用 Celery 多年,我对它越来越感到沮丧是我开发dramatiq 的原因之一。以下是 Dramatiq、Celery 和 RQ 之间的一些主要区别:
还有一个 Django 帮助包:https : //github.com/Bogdanp/django_dramatiq
当然,你不会有一个内置的 celerybeat,但是调用 python 任务的 cron 更健壮,我们丢失了大量数据,因为 celerybeat 决定定期停止:)
有两个项目旨在添加周期性任务创建:https : //gitlab.com/bersace/periodiq和https://apscheduler.readthedocs.io/en/stable/
我还没有使用过这些包,您可以尝试使用 periodiq 选择您的数据库条目,遍历这些条目并为每个条目定义一个周期任务(但这需要定期重启 periodiq 工作人员以获取更改):
# tasks.py
from dramatiq import get_broker
from periodiq import PeriodiqMiddleware, cron
broker = get_broker()
broker.add_middleware(PeriodiqMiddleware(skip_delay=30))
for obj in Task.objects.all():
@dramatiq.actor(periodic=cron(obj.frequency))
def hourly(obj=obj):
# import logic based on obj.name
# Do something each hour…
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
938 次 |
| 最近记录: |