在Python-RQ中减少机器上工作者数量的好方法是什么?
根据文档,我需要向计算机上的一个工作进程发送SIGINT或SIGTERM命令:
取下工人
如果工作人员在任何时间接收
SIGINT(通过Ctrl+C)或SIGTERM(通过kill)工作人员等待当前正在运行的任务完成,则停止工作循环并优雅地注册其自己的死亡.如果,在此删除阶段,
SIGINT或SIGTERM再次收到,工人将强行终止子进程(发送它SIGKILL),但仍将尝试注册自己的死亡.
这似乎意味着很多编码开销:
我是否真的需要自定义构建它,或者有没有办法使用Python-RQ库或其他现有库轻松完成此操作?
I'm facing a basic issue while setting up python-rq - the rqworker doesn't seem to recognize jobs that are pushed to the queue it's listening on.
Everything is run inside virtualenv
I have the following code:
from redis import Redis
from rq import Queue
from rq.registry import FinishedJobRegistry
from videogen import videogen
import time
redis_conn = Redis(port=5001)
videoq = Queue('medium', connection=redis_conn)
fin_registry = FinishedJobRegistry(connection=redis_conn, name='medium')
jobid = 1024
job = videoq.enqueue(videogen, jobid)
while not job.is_finished:
time.sleep(2)
print job.result
Run Code Online (Sandbox Code Playgroud)
Here videogen …
我不太了解 python rq,我刚刚开始学习它。
有一个task_a需要3分钟才能完成处理。
@job
def task_a():
time.sleep(180)
print('done processing task_a')
def call_3_times():
task_a.delay()
task_a.delay()
task_a.delay()
Run Code Online (Sandbox Code Playgroud)
据我观察,task_a将从队列中一一执行。第一个呼叫结束后,再进行下一个呼叫,依此类推。总时间为 3 分钟 x 3 = 9 分钟
如何使每个task_aincall_3_times函数并行执行?所以所花费的时间少于 9 分钟,可能是 3 分 10 秒(只是一个例子,它可能会比这更快)。
也许我需要生成 3 个 rq 工作人员,是的,它确实工作得更快并且像并行一样。但如果我需要调用它 2000 次怎么办?我应该生成 2000 个 rq 工人吗?我的意思是,必须有更好的方法来做到这一点。
与此问题类似,有什么方法可以将同一文件中定义的函数提交给 python-rq?@GG_Python 谁让我为此创建一个新问题。
用法示例:
# somemodule.py
from redis import Redis
from rq import Queue
def somefunc():
do_something()
q = Queue(connection=Redis('redis://redis'))
q.enqueue(somefunc)
Run Code Online (Sandbox Code Playgroud)
是的,我知道答案是在 someothermodule.py 中定义 somefunc ,然后在上面的代码段中定义from someothermodule import somefunc,但我真的不想。也许我对表单过于执着,但 somefunc 确实属于它排队的同一个文件(实际上, somefunc 接受一个 docker 容器名称并生成它)。我真的希望整个事情都是自包含的,而不是有两个模块。
我注意到,通过挖掘 python-rq 源代码,Queue.enqueue 实际上可以接受一个字符串而不是实际的模块,所以我希望我可以通过somemodule.somefunc,但没有那么幸运。有任何想法吗?
我正在尝试测试排队的 redis 作业,但是 meta数据似乎没有在任务和发起者之间传递。job_id 似乎匹配,所以我很困惑。也许一些新鲜的眼睛可以帮助我解决这个问题:
任务是根据文档:
from rq import get_current_job
def do_test(word):
job = get_current_job()
print job.get_id()
job.meta['word'] = word
job.save()
print "saved: ", job.meta['word']
return True
Run Code Online (Sandbox Code Playgroud)
该rqworker日志打印JOB_ID和Word在保存后
14:32:32 *** Listening on default...
14:33:07 default: labeller.do_test('supercalafragelistic') (a6e2e579-df26-411a-b017-8788d621149f)
a6e2e579-df26-411a-b017-8788d621149f
saved: supercalafragelistic
14:33:07 Job OK, result = True
14:33:07 Result is kept for 500 seconds.
Run Code Online (Sandbox Code Playgroud)
该任务是从单元测试调用的:
class RedisQueueTestCase(unittest.TestCase):
"""
Requires running "rqworker" on the localhost cmdline
"""
def setUp(self):
use_connection()
self.q = Queue()
def test_enqueue(self):
job = …Run Code Online (Sandbox Code Playgroud) 我在Heroku应用程序中使用Django-RQ来处理后台任务.
当我的后台任务发生错误时,它不会被发送到Sentry.
settings.py中的日志记录设置如下:
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'root': {
'level': 'WARNING',
'handlers': ['sentry'],
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse'
}
},
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s '
'%(process)d %(thread)d %(message)s'
},
"rq_console": {
"format": "%(asctime)s %(message)s",
"datefmt": "%H:%M:%S",
},
},
'handlers': {
'mail_admins': {
'level': 'ERROR',
'filters': ['require_debug_false'],
'class': 'django.utils.log.AdminEmailHandler'
},
"rq_console": {
"level": "DEBUG",
"class": "rq.utils.ColorizingStreamHandler",
"formatter": "rq_console",
"exclude": ["%(asctime)s"],
},
'sentry': {
'level': 'ERROR',
'class': 'raven.contrib.django.raven_compat.handlers.SentryHandler'
},
'console': {
'level': 'DEBUG', …Run Code Online (Sandbox Code Playgroud) 我在队列失败时失败 1次.
$ rq info
failed |? 1
1 queues, 1 jobs total
Run Code Online (Sandbox Code Playgroud)
作为回答的@Byron露丝,我能得到这个数字是这样的:
from rq import Queue
from redis import Redis
q = Queue('failed', connection=Redis())
print len (q.jobs)
Run Code Online (Sandbox Code Playgroud)
在rq-dashboard上,我看到了回溯:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/rq/worker.py", line 588, in perform_job
rv = job.perform()
...
Run Code Online (Sandbox Code Playgroud)
如何使用Python代码获取此回溯?如果这是不可能的,任何命令行解决方案都是可接受的(如Bash或类似的).
我正在尝试运行该rq info命令,但我想在远程 redis 机器上获取信息。如何指定redis机器的url?
我正在尝试使用RQ创建后台作业:
import django_rq
def _send_password_reset_email_async(email):
print(email)
# Django admin action to send reset password emails
def send_password_reset_email(modeladmin, request, queryset):
for user in queryset:
django_rq.enqueue(_send_password_reset_email_async, user.email)
send_password_reset_email.short_description = 'Send password reset email'
Run Code Online (Sandbox Code Playgroud)
我一直收到此错误,似乎我在做一些愚蠢的事情?
Traceback (most recent call last):
File "/home/lee/Code/cas/venv/lib/python3.4/site-packages/rq/worker.py", line 568, in perform_job
rv = job.perform()
File "/home/lee/Code/cas/venv/lib/python3.4/site-packages/rq/job.py", line
495, in perform
self._result = self.func(*self.args, **self.kwargs)
File "/home/lee/Code/cas/venv/lib/python3.4/site-packages/rq/job.py", line 206, in func
return import_attribute(self.func_name)
File "/home/lee/Code/cas/venv/lib/python3.4/site-packages/rq/utils.py", line 151, in import_attribute
return getattr(module, attribute)
AttributeError: 'module' object has no attribute '_send_password_reset_email_async …Run Code Online (Sandbox Code Playgroud) 我有一个从网络服务器下载资源的 Python RQ 作业。
如果网络服务器无响应,下载作业是否可以自行重新安排并在一定时间间隔后重试下载?
多个转换作业依赖于下载作业
job_queue.enqueue(transformation_task, depends_on=download_job)
如果下载作业可以自行重新安排,那么相关作业是否会保留下来,并在下载作业完成后最终执行?