Ale*_*ysh 16 scalability scheduled-tasks redis
我需要设计一个Redis驱动的可扩展任务调度系统.
要求:
伪API : schedule_task(timestamp, task_data). 时间戳以整数秒为单位.
基本理念:
到目前为止,我无法弄清楚如何在Redis原语中使用它...
有线索吗?
请注意,有一个类似的旧问题:使用Redis延迟执行/调度?在这个新问题中,我介绍了更多细节(最重要的是,许多工人).到目前为止,我无法弄清楚如何在这里应用旧答案 - 因此,这是一个新问题.
Dan*_*amy 10
这是另一个基于其他几个解决方案的解决方案[1].它使用redis WATCH命令删除竞争条件,而不使用redis 2.6中的lua.
基本方案是:
我没有测试过:-)
foo工作创建者会这样做:
def schedule_task(queue, data, delay_secs):
# This calculation for run_at isn't great- it won't deal well with daylight
# savings changes, leap seconds, and other time anomalies. Improvements
# welcome :-)
run_at = time.time() + delay_secs
# If you're using redis-py's Redis class and not StrictRedis, swap run_at &
# the dict.
redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})
schedule_task('foo_queue', foo_data, 60)
Run Code Online (Sandbox Code Playgroud)
调度员看起来像:
while working:
redis.watch(SCHEDULED_ZSET_KEY)
min_score = 0
max_score = time.time()
results = redis.zrangebyscore(
SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
if results is None or len(results) == 0:
redis.unwatch()
sleep(1)
else: # len(results) == 1
redis.multi()
redis.rpush(results[0]['queue'], results[0]['data'])
redis.zrem(SCHEDULED_ZSET_KEY, results[0])
redis.exec()
Run Code Online (Sandbox Code Playgroud)
foo工作者看起来像:
while working:
task_data = redis.blpop('foo_queue', POP_TIMEOUT)
if task_data:
foo(task_data)
Run Code Online (Sandbox Code Playgroud)
[1]此解决方案基于not_a_golfer,一个位于http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/,以及redis docs for transactions.
您没有指定您正在使用的语言.至少有三种方法可以在Python中编写一行代码.
Celery有一个可选的redis代理. http://celeryproject.org/
resque是一个非常流行的使用redis的redis任务队列. https://github.com/defunkt/resque
RQ是一个简单的小型基于redis的队列,旨在"从celery和resque中获取好东西",并且更加简单易用. http://python-rq.org/
如果你不能使用它们,你至少可以看看它们的设计.
但要回答你的问题 - 你想要什么可以用redis完成.实际上,我过去或多或少都写过.
编辑:至于在redis上建模你想要的东西,这就是我要做的:
使用时间戳排队任务将由客户端直接完成 - 您将任务放在一个有序集合中,时间戳作为分数,任务作为值(参见ZADD).
中央调度程序每隔N秒唤醒一次,检查此集合上的第一个时间戳,如果有任务准备好执行,它会将任务推送到"现在要执行"列表.这可以通过"等待"排序集上的ZREVRANGEBYSCORE来完成,获取时间戳<= now的所有项目,这样您就可以立即获得所有准备好的项目.推动由RPUSH完成.
工作人员在"现在要执行"列表中使用BLPOP,当有东西可以工作时唤醒,并做他们的事情.这是安全的,因为redis是单线程的,并且没有2名工作人员将执行相同的任务.
完成后,工作人员将结果放回响应队列,由队列或其他线程检查.您可以添加"待处理"存储桶以避免故障或类似情况.
所以代码看起来像这样(这只是伪代码):
客户:
ZADD "new_tasks" <TIMESTAMP> <TASK_INFO>
Run Code Online (Sandbox Code Playgroud)
调度员:
while working:
tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now
for task in tasks:
#do the delete and queue as a transaction
MULTI
RPUSH "to_be_executed" task
ZREM "new_tasks" task
EXEC
sleep(1)
Run Code Online (Sandbox Code Playgroud)
我没有添加响应队列处理,但它或多或少像工作者:
工人:
while working:
task = BLPOP "to_be_executed" <TIMEOUT>
if task:
response = work_on_task(task)
RPUSH "results" response
Run Code Online (Sandbox Code Playgroud)
编辑:无状态原子调度员:
while working:
MULTI
ZREVRANGE "new_tasks" 0 1
ZREMRANGEBYRANK "new_tasks" 0 1
task = EXEC
#this is the only risky place - you can solve it by using Lua internall in 2.6
SADD "tmp" task
if task.timestamp <= now:
MULTI
RPUSH "to_be_executed" task
SREM "tmp" task
EXEC
else:
MULTI
ZADD "new_tasks" task.timestamp task
SREM "tmp" task
EXEC
sleep(RESOLUTION)
Run Code Online (Sandbox Code Playgroud)
综合方法似乎是可行的:
\n\n新任务时间戳不能小于当前时间(如果小于则钳制)。假设 NTP 同步可靠。
所有任务都会进入键处的存储桶列表,并以任务时间戳为后缀。
此外,所有任务时间戳都转到专用 zset(键和分数 \xe2\x80\x94 时间戳本身)。
通过单独的 Redis 列表从客户端接受新任务。
循环:通过 zrangebyscore ... limit 获取最早的 N 个过期时间戳。
新任务列表和获取时间戳列表上有超时的 BLPOP。
如果有旧任务,请处理它。如果新的 \xe2\x80\x94 添加到存储桶和 zset。
检查已处理的桶是否为空。如果是 \xe2\x80\x94 则删除 list 并从 zset 中进入。可能不检查最近过期的存储桶,以防止出现时间同步问题。结束循环。
批判?评论?备择方案?
\n| 归档时间: |
|
| 查看次数: |
13953 次 |
| 最近记录: |