使用Redis可扩展的延迟任务执行

Ale*_*ysh 16 scalability scheduled-tasks redis

我需要设计一个Redis驱动的可扩展任务调度系统.

要求:

  • 多个工作进程.
  • 许多任务,但长时间的闲置是可能的.
  • 合理的计时精度.
  • 闲置时资源浪费最少.
  • 应该使用同步Redis API.
  • 应该适用于Redis 2.4(即即将推出的2.6中没有功能).
  • 不应该使用RPC的其他方法而不是Redis.

伪API : schedule_task(timestamp, task_data). 时间戳以整数秒为单位.

基本理念:

  • 在列表中收听即将发生的任务.
  • 每个时间戳将任务放入存储桶.
  • 睡到最近的时间戳.
  • 如果出现时间戳小于最接近的任务的新任务,则唤醒.
  • 使用时间戳≤现在,批量处理所有即将执行的任务(假设任务执行速度很快).
  • 确保并发工作程序不会处理相同的任务.同时,如果我们在处理它们时崩溃,请确保没有任何任务丢失.

到目前为止,我无法弄清楚如何在Redis原语中使用它...

有线索吗?

请注意,有一个类似的旧问题:使用Redis延迟执行/调度?在这个新问题中,我介绍了更多细节(最重要的是,许多工人).到目前为止,我无法弄清楚如何在这里应用旧答案 - 因此,这是一个新问题.

Dan*_*amy 10

这是另一个基于其他几个解决方案的解决方案[1].它使用redis WATCH命令删除竞争条件,而不使用redis 2.6中的lua.

基本方案是:

  • 将redis zset用于计划任务,使用redis队列执行准备运行任务.
  • 让调度程序轮询zset并将准备运行的任务移动到redis队列中.您可能需要多个调度程序来实现冗余,但您可能不需要或不需要许多调度程序.
  • 拥有尽可能多的工作人员阻止redis队列上的弹出窗口.

我没有测试过:-)

foo工作创建者会这样做:

def schedule_task(queue, data, delay_secs):
    # This calculation for run_at isn't great- it won't deal well with daylight
    # savings changes, leap seconds, and other time anomalies. Improvements
    # welcome :-)
    run_at = time.time() + delay_secs

    # If you're using redis-py's Redis class and not StrictRedis, swap run_at &
    # the dict.
    redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})

schedule_task('foo_queue', foo_data, 60)
Run Code Online (Sandbox Code Playgroud)

调度员看起来像:

while working:
    redis.watch(SCHEDULED_ZSET_KEY)
    min_score = 0
    max_score = time.time()
    results = redis.zrangebyscore(
        SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
    if results is None or len(results) == 0:
        redis.unwatch()
        sleep(1)
    else: # len(results) == 1
        redis.multi()
        redis.rpush(results[0]['queue'], results[0]['data'])
        redis.zrem(SCHEDULED_ZSET_KEY, results[0])
        redis.exec()
Run Code Online (Sandbox Code Playgroud)

foo工作者看起来像:

while working:
    task_data = redis.blpop('foo_queue', POP_TIMEOUT)
    if task_data:
        foo(task_data)
Run Code Online (Sandbox Code Playgroud)

[1]此解决方案基于not_a_golfer,一个位于http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/,以及redis docs for transactions.

  • 如果它对任何人感兴趣,我已经创建了上述的Java实现......经过全面测试和功能.https://github.com/davidmarquis/redis-scheduler (2认同)

Not*_*fer 7

您没有指定您正在使用的语言.至少有三种方法可以在Python中编写一行代码.

  1. Celery有一个可选的redis代理. http://celeryproject.org/

  2. resque是一个非常流行的使用redis的redis任务队列. https://github.com/defunkt/resque

  3. RQ是一个简单的小型基于redis的队列,旨在"从celery和resque中获取好东西",并且更加简单易用. http://python-rq.org/

如果你不能使用它们,你至少可以看看它们的设计.

但要回答你的问题 - 你想要什么可以用redis完成.实际上,我过去或多或少都写过.

编辑:至于在redis上建模你想要的东西,这就是我要做的:

  1. 使用时间戳排队任务将由客户端直接完成 - 您将任务放在一个有序集合中,时间戳作为分数,任务作为值(参见ZADD).

  2. 中央调度程序每隔N秒唤醒一次,检查此集合上的第一个时间戳,如果有任务准备好执行,它会将任务推送到"现在要执行"列表.这可以通过"等待"排序集上的ZREVRANGEBYSCORE来完成,获取时间戳<= now的所有项目,这样您就可以立即获得所有准备好的项目.推动由RPUSH完成.

  3. 工作人员在"现在要执行"列表中使用BLPOP,当有东西可以工作时唤醒,并做他们的事情.这是安全的,因为redis是单线程的,并且没有2名工作人员将执行相同的任务.

  4. 完成后,工作人员将结果放回响应队列,由队列或其他线程检查.您可以添加"待处理"存储桶以避免故障或类似情况.

所以代码看起来像这样(这只是伪代码):

客户:

ZADD "new_tasks" <TIMESTAMP> <TASK_INFO>
Run Code Online (Sandbox Code Playgroud)

调度员:

while working:
   tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now
   for task in tasks:

       #do the delete and queue as a transaction
       MULTI
       RPUSH "to_be_executed" task
       ZREM "new_tasks" task
       EXEC

   sleep(1)
Run Code Online (Sandbox Code Playgroud)

我没有添加响应队列处理,但它或多或少像工作者:

工人:

while working:
   task = BLPOP "to_be_executed" <TIMEOUT>
   if task:
      response = work_on_task(task)
      RPUSH "results" response
Run Code Online (Sandbox Code Playgroud)

编辑:无状态原子调度员:

while working:

   MULTI
   ZREVRANGE "new_tasks" 0 1
   ZREMRANGEBYRANK "new_tasks" 0 1
   task = EXEC

   #this is the only risky place - you can solve it by using Lua internall in 2.6
   SADD "tmp" task

   if task.timestamp <= now:
       MULTI
       RPUSH "to_be_executed" task
       SREM "tmp" task
       EXEC
   else:

       MULTI
       ZADD "new_tasks" task.timestamp task
       SREM "tmp" task
       EXEC

   sleep(RESOLUTION)
Run Code Online (Sandbox Code Playgroud)


Ale*_*ysh 0

综合方法似乎是可行的:

\n\n
    \n
  1. 新任务时间戳不能小于当前时间(如果小于则钳制)。假设 NTP 同步可靠。

  2. \n
  3. 所有任务都会进入键处的存储桶列表,并以任务时间戳为后缀。

  4. \n
  5. 此外,所有任务时间戳都转到专用 zset(键和分数 \xe2\x80\x94 时间戳本身)。

  6. \n
  7. 通过单独的 Redis 列表从客户端接受新任务。

  8. \n
  9. 循环:通过 zrangebyscore ... limit 获取最早的 N 个过期时间戳。

  10. \n
  11. 新任务列表和获取时间戳列表上有超时的 BLPOP。

  12. \n
  13. 如果有旧任务,请处理它。如果新的 \xe2\x80\x94 添加到存储桶和 zset。

  14. \n
  15. 检查已处理的桶是否为空。如果是 \xe2\x80\x94 则删除 list 并从 zset 中进入。可能不检查最近过期的存储桶,以防止出现时间同步问题。结束循环。

  16. \n
\n\n

批判?评论?备择方案?

\n