我正在使用http://python-rq.org/在Heroku worker dynos上排队和执行任务.这些是长时间运行的任务,偶尔我需要在执行中期取消它们.我怎么用Python做到这一点?
from redis import Redis
from rq import Queue
from my_module import count_words_at_url
q = Queue(connection=Redis())
result = q.enqueue(
count_words_at_url, 'http://nvie.com')
Run Code Online (Sandbox Code Playgroud)
后来我想做一个单独的过程:
from redis import Redis
from rq import Queue
from my_module import count_words_at_url
q = Queue(connection=Redis())
result = q.revoke_all() # or something
Run Code Online (Sandbox Code Playgroud)
谢谢!
我已经开始使用 RQ/Redis 为我的 django 站点构建一些长时间运行的作业的异步执行。我希望做如下事情:
我想要一个模型的每个实例一个队列。你可以把这个模型想象成一个 api 用户帐户。(这些不会很多。最多 15 - 20 个)
我将在队列中均匀地分配批次的任务(从 10 到 500 之间的任意位置)。在第一批完成之前可以添加多批。
对于每个批次,我想为每个没有积极处理的队列启动一个工作程序,我想以批处理模式运行这些工作程序,以便一旦它们用完任务,它们就会关闭。
我意识到我不能以批处理模式运行它们,然后我将始终在所有队列上工作/监听工作。这样做的问题是我希望能够动态添加和删除队列,所以最好每批启动可用队列。
我意识到我在队列之间分配任务似乎很奇怪,但原因是同一队列中的每个任务必须根据我使用的服务进行速率限制/节流(将其视为 API速率限制,但每个队列代表不同的帐户)。但就我的目的而言,任务在哪个帐户上运行没有区别,所以我不妨跨所有帐户并行化。
我面临的问题是,如果我启动一个工人并给它一个已经在处理的队列,我现在有两个工人在该队列上独立运行,因此我预期的节流率会减半。如果没有工作人员在该队列上运行,我如何仅启动工作人员?我可能会为此找到一个 hacky 解决方案,但我更愿意以“正确”的方式处理它,因为我对队列没有太多经验,所以我想我应该问一下。
我已经在实现我自己的工作类,以便我可以动态控制队列,所以我只需要一种方法来添加逻辑,如果该队列已经在处理,则不会给它一个新的工作人员。我的工人的一个简单版本在这里:
# custom_worker.py
import sys
from Api.models import *
from rq import Queue, Connection, Worker
# importing the necessary namespace for the tasks to run
from tasks import *
# dynamically getting the queue names in which I am expecting tasks
queues = [user.name for user in ApiUser.objects.all()]
with Connection():
qs = list(map(Queue, queues)) or [Queue()] …Run Code Online (Sandbox Code Playgroud) 所有,我都试图“强制” RQ 工作人员使用 supervisord 并发执行。我的设置 supervisord 设置似乎工作正常,因为 rq-dashboard 显示 3 个工人、3 个 PID 和 3 个队列(每个工人/PID 一个)。Supervisord 设置如下(仅显示工人 1 设置,在此下面定义了另外 2 个工人):
[program:rqworker1]
command = rqworker 1
process_name = rqworker1-%(process_num)s
numprocs = 1
user = username
autostart = True
stdout_logfile=/tmp/rqworker1.log
stdout_logfile_maxbytes=50MB
Run Code Online (Sandbox Code Playgroud)

问题是当我同时发送 3 个作业时,运行的总时间是单个任务的 x3(即,总时间与任务数量成线性关系,这可扩展为 x4、x5 等)。似乎没有可用的并发。我还通过将新作业发送到具有最少启动+排队作业的队列来实现原始负载平衡,这工作正常(观察到作业在队列中均匀分布)。
为什么这个设置不允许并发?
关于我缺少的设置的任何考虑因素?
请注意,当我迁移到 PY3 时,rq-gevent-worker 包(在早期的 wrt 并发/RQ 中运行良好)不再可用,并且 PY3 尚不支持 gevent 本身。但这给了我一个线索,即并发是可能的。
所以,RQ明确指出我可以在这里排队一个对象的实例方法,所以我一直试图这样做,但得到一个PicklingError:
q.enqueue(some_obj.some_func, some_data)
*** PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
真的,我只需要在我的方法中访问SQL连接,所以我试着让它成为一个明确接受SQL连接的函数.那也失败了:
q.enqueue(some_func, sql_sess, some_data)
*** PicklingError: Can't pickle <class 'sqlalchemy.orm.session.Session'>: it's not the same object as sqlalchemy.orm.session.Session
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?我做错了什么,或者图书馆坏了吗?
我正在使用python-rq来管理基于Redis的作业,我想确定我的工作人员当前正在处理哪些作业.
python-rq提供了一个get_current_job函数来查找连接的"当前作业",但是:
这是我的代码(总是返回None):
from rq import Queue, get_current_job
redis_url = os.getenv('REDIS_FOO')
parse.uses_netloc.append('redis')
url = parse.urlparse(redis_url)
conn = Redis(host=url.hostname, port=url.port, db=0, password=url.password)
q = Queue(connection=conn)
get_current_job(connection=conn)
Run Code Online (Sandbox Code Playgroud)
有没有人有任何想法,请让上面的代码工作,但更重要的是,从这个连接获得所有队列中所有工作人员的所有当前工作的列表?
在阅读rq文档时,我注意到rq worker在启动工作程序时可以传递一些参数
例:
rq worker --worker-class 'foo.bar.MyWorker'
Run Code Online (Sandbox Code Playgroud)
参数列表包括
--worker-class或-w:要使用的RQ Worker类(例如rq worker --worker-class'foo.bar.MyWorker')--job-class或-j:要使用的RQ Job类。--queue-class:要使用的RQ Queue类。什么是工作者类,作业类和队列类,何时使用它们?
我的第一个问题/帖子...请善待....
我正在做一个个人项目,其中一个模块循环运行以收集数据。当数据进入时,它会将数据插入数据库交给队列中的一个函数,在那里一个侦听 rq 工作器接收它并处理该函数。数据库使用 SQLAlchemy 进行管理,这意味着它必须生成引擎、会话并定义数据库表。
代码文件的结构是:
--/home/..../collect-view/ (this is the project folder)
-- DataCollection
-- main_client.py (main loop waiting for user data)
-- collect_data.py (contains the database insertion function)
-- base.py (the base file for SQLAlchemy database definition)
-- tables.py (the file which sets up the table name and definition)
-- app.db (the database file)
Run Code Online (Sandbox Code Playgroud)
注意:数据库文件位于更高级别的目录中,因为它也被另一个位于此级别的应用程序(Flask 应用程序)访问
要实现此代码,“collect_data”必须导入“base”和“tables”,“tables”必须导入“base”。这被证明是一个问题,因为一旦 collect_data 函数(称为“传输”)由工作人员运行,它就无法再找到要导入的文件,并且工作人员会吐出一个异常,说它无法导入“base ”。我在网上搜索了答案,最终在 nvie 的 Github 上找到了一个答案,其中提到使用 --path 选项将工作人员引导到正确的路径。我通过实现它来工作:
$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection
Run Code Online (Sandbox Code Playgroud)
然后我遇到了另一个与路径相关的失败,其中工作人员说它找不到我试图将数据插入的数据库表。所以我更改了引擎创建步骤以包含我的完整路径......
base_url = '/home/.../collect_view/'
engine = …Run Code Online (Sandbox Code Playgroud) 我想使用 rq 在单独的工作人员上运行任务,以从测量仪器收集数据。用户按下仪表板应用程序上的按钮将发出任务结束的信号。问题是任务本身不知道何时终止,因为它无权访问 dash 应用程序的上下文。
我已经习惯meta将信息从工作人员传递回调用者,但是我可以将信息从调用者传递给工作人员吗?
示例任务:
from rq import get_current_job
from time import time
def mock_measurement():
job = get_current_job()
t_start = time()
# Run the measurement
t = []
i = []
job.meta['should_stop'] = False # I want to use this tag to tell the job to stop
while not job.meta['should_stop']:
t.append(time() - t_start)
i.append(np.random.random())
job.meta['data'] = (t, i)
job.save_meta()
sleep(5)
print("Job Finished")
Run Code Online (Sandbox Code Playgroud)
从控制台,我可以开始这样的工作
queue = rq.Queue('test-app', connection=Redis('localhost', 6379))
job = queue.enqueue('tasks.mock_measurement')
Run Code Online (Sandbox Code Playgroud)
我希望能够从控制台执行此操作,以向工作人员表明它可以停止运行:
job.meta['should_stop'] = True
job.save_meta() …Run Code Online (Sandbox Code Playgroud) 我有一个 Web 服务(Python 3.7、Flask 1.0.2),其工作流由 3 个步骤组成:
远程计算作业的长度是任意的(在秒和天之间),每一步都依赖于上一步的完成:
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
job1 = q.enqueue(step1)
job2 = q.enqueue(step2, depends_on=job1)
job3 = q.enqueue(step3, depends_on=job2)
Run Code Online (Sandbox Code Playgroud)
但是,最终所有工作人员(4 名工作人员)都将进行轮询(4 个客户端请求中的第 2 步),而他们应该继续执行其他传入请求的第 1 步和已成功通过第 2 步的那些工作流的第 3 步。
工人应该在每次投票后被释放。他们应该定期返回第 2 步进行下一次轮询(每个作业最多每 61 秒一次),如果远程计算作业轮询未返回“DONE”,则重新排队轮询作业。
此时我开始使用rq-scheduler(因为间隔和重新排队功能听起来很有希望):
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
s = Scheduler('default')
job1 = q.enqueue(step1, REQ_ID)
job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
job2.meta['interval'] = 61
job2.origin …Run Code Online (Sandbox Code Playgroud) 我只需要为我的一个队列(下面的“高”优先级)启动多个工作线程。如何在我用来启动工作人员的工作人员脚本的上下文中执行此操作?
from config import Config
from redis import from_url
from rq import Worker, Queue, Connection
listen = ['high','mid','low']
conn = from_url(Config.REDIS_URL)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)),log_job_description=True,)
worker.work()
Run Code Online (Sandbox Code Playgroud)
工作脚本本身由主管进程调用,该进程为我的每个队列生成 2 个工作实例。
[supervisord]
[program:worker]
command=python -m worker
process_name=%(program_name)s-%(process_num)s
numprocs=2
directory=.
stopsignal=TERM
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
redirect_stderr=true
Run Code Online (Sandbox Code Playgroud)
如果我想让 3 个工作人员为“高”队列做好准备,但只有 2 个工作人员为“中”和“低”队列做好准备,我该如何实现这一目标?
我尝试以“突发”模式启动工人,但如果没有足够的工作机会,这也会杀死工人。我可以接受一种解决方案,它可以像爆发一样自动扩展工作人员,但始终保持至少一个工作人员处于活动状态。
python-rq ×10
python ×9
redis ×5
python-3.x ×2
task-queue ×2
concurrency ×1
django ×1
flask ×1
heroku ×1
plotly-dash ×1
queue ×1
sqlalchemy ×1
supervisord ×1