状态机在特定时间转换

wes*_*son 11 queue ruby-on-rails state-machine

简化示例:

我有一个待办事项.它可以是未来,当前或晚期,具体取决于它的时间.

  Time       State
  8:00 am    Future
  9:00 am    Current
  10:00 am   Late
Run Code Online (Sandbox Code Playgroud)

因此,在此示例中,待办事项从上午9点到上午10点是"当前".

最初,我考虑为"current_at"和"late_at"添加字段,然后使用实例方法返回状态.我可以查询所有"当前"待办事项now > current and now < late.

简而言之,我每次都会计算状态,或者使用SQL来提取我需要的状态集.

如果我想使用状态机,我会有一组状态,并将该状态名称存储在待办事项上.但是,如何在每个待办事项的特定时间触发州之间的过渡?

  • 每分钟运行一次cron作业以拉出状态但超过转换时间并更新它
  • 使用后台处理在将来的适当时间对转换作业进行排队,因此在上面的示例中,我将有两个作业:"在上午9点过渡到当前"和"在上午10点过渡到晚上"可能有逻辑来保护反对删除的待办事项和"如果完成则不要迟到"等等.

在尝试在特定时间处理大量状态转换时,是否有人有管理这些选项的经验?

感觉就像一台状态机,我只是不确定管理所有这些转换的最佳方法.

回复后更新:

  • 是的,我需要查询"当前"或"未来"待办事项
  • 是的,我需要触发关于状态变化的通知("你的待办事项没有完成")

因此,我希望更多的是类似状态机的想法,以便我可以封装过渡.

Mic*_*uff 6

我设计并维护了几个管理大量这些小型状态机的系统.(某些系统,最高100K /天,约100K /分钟)

我发现你明确表达的状态越多,就越有可能打破某个地方.或者换句话说,推断出的状态越多,解决方案就越健壮.

话虽如此,你必须保持一些状态.但尽量保持尽可能小.

此外,将状态机逻辑保持在一个位置使系统更健壮,更易于维护.也就是说,不要将状态机逻辑放在代码和数据库中.我更喜欢代码中的逻辑.

首选解决方案 (简单的图片是最好的).

对于你的例子,我会有一个非常简单的表:

task_id, current_at, current_duration, is_done, is_deleted, description...
Run Code Online (Sandbox Code Playgroud)

推断基于状态now的关系current_atcurrent_duration.这非常有效.确保对表进行索引/分区current_at.

处理过渡变化的逻辑

当您需要在转换更改时触发事件时,情况会有所不同.

将表格更改为如下所示:

task_id, current_at, current_duration, state, locked_by, locked_until, description...
Run Code Online (Sandbox Code Playgroud)

保持索引current_at,并根据需要添加一个state.您现在正在重整的状态,这样的事情是有点更脆弱,由于并发或失败,所以我们必须撑住它使用一点点locked_bylocked_until乐观锁,我将在下面介绍.

我假设你的程序在处理过程中会失败 - 即使只是为了部署.

您需要一种机制将任务从一种状态转换为另一种状态.为了简化讨论,我将关注从FUTURE转到CURRENT,但无论过渡如何,逻辑都是一样的.

如果您的数据集足够大,您不断轮询数据库以发现发现需要转换的任务(当然,当没有任何事情可做时,线性或指数退避); 否则你使用或你最喜欢的调度程序是基于 cron还是ruby,如果你订阅了Java/Scala/C#,则使用Quartz.

选择需要从FUTURE移动到CURRENT 当前未锁定的所有条目.

(更新 :)

-- move from pending to current
select task_id
  from tasks
 where now >= current_at
   and (locked_until is null OR locked_until < now)
   and state == 'PENDING'
   and current_at >= (now - 3 days)         -- optimization
 limit :LIMIT                               -- optimization
Run Code Online (Sandbox Code Playgroud)

把所有这些task_id扔进你可靠的队列.或者,如果必须,只需在脚本中处理它们.

当您开始处理某个项目时,必须先使用我们的乐观锁定方案将其锁定:

update tasks
   set locked_by = :worker_id     -- unique identifier for host + process + thread
     , locked_until = now + 5 minutes -- however this looks in your SQL langage
 where task_id = :task_id         -- you can lock multiple tasks here if necessary
   and (locked_until is null OR locked_until < now) -- only if it's not locked!
Run Code Online (Sandbox Code Playgroud)

现在,如果你真的更新了记录,你就拥有了锁.您现在可以启动特殊的转换逻辑.(掌声.这就是让你与所有其他任务经理不同的原因,对吧?)

如果成功,请更新任务状态,确保仍使用乐观锁定:

update tasks
   set state = :new_state
     , locked_until = null -- explicitly release the lock (an optimization, really)
 where task_id = :task_id
   and locked_by = :worker_id -- make sure we still own the lock
                              -- no-one really cares if we overstep our time-bounds
Run Code Online (Sandbox Code Playgroud)

多线程/流程优化

只有在多个线程或进程批量更新任务时(例如在cron作业中或轮询数据库),才能执行此操作!问题是他们每个人都会从数据库中获得类似的结果,然后争先恐后地锁定每一行.这是低效的,因为它会减慢数据库的速度,并且因为你的线程基本上什么都不做,只会减慢其他线程的速度.

因此,添加查询返回的结果数量限制并遵循此算法:

results = database.tasks_to_move_to_current_state :limit => BATCH_SIZE
while !results.empty
    results.shuffle! # make sure we're not in lock step with another worker
    contention_count = 0
    results.each do |task_id|
        if database.lock_task :task_id => task_id
           on_transition_to_current task_id
        else
           contention_count += 1
        end
        break if contention_count > MAX_CONTENTION_COUNT # too much contention!
    done
    results = database.tasks_to_move_to_current_state :limit => BATCH_SIZE
end
Run Code Online (Sandbox Code Playgroud)

反复折腾BATCH_SIZE,并MAX_CONTENTION_COUNT直到程序速度超快.


更新:

乐观锁定允许并行处理多个处理器.

通过锁定超时(通过locked_until字段),它允许在处理转换时失败.如果处理器发生故障,另一个处理器能够在超时后接收任务(上述代码中为5分钟).因此,重要的是a)只在你要处理任务时锁定任务; 和b)锁定任务完成任务需要多长时间以及慷慨的余地.

locked_by字段主要用于调试目的(这个进程/机器是什么?)locked_until如果数据库驱动程序返回更新的行数,则只有在一次更新一行时才有字段.


kpu*_*nam 3

对于中等大小的数据集,一种简单的解决方案是使用 SQL 数据库。每个待办事项记录应具有“state_id”、“current_at”和“late_at”字段。您可能可以省略“future_at”,除非您确实有四个状态。

这允许三种状态:

  1. 未来:当现在< current_at时
  2. 当前:当 current_at <= now < Late_at
  3. 迟到:当 Late_at <= now时

将状态存储为state_id(可以选择为名为“states”的查找表创建外键,其中1: Future, 2: Current, 3: Late)基本上是存储非规范化数据,这可以让您避免重新计算状态,因为它很少更改。

如果您实际上没有根据状态(例如... WHERE state_id = 1)查询待办事项记录或在状态更改时触发一些副作用(例如发送电子邮件),那么也许您不需要管理状态。如果您只是向用户显示待办事项列表并指出哪些事项迟到了,那么最便宜的实现甚至可能是在客户端进行计算。为了回答的目的,我假设你需要管理状态。

您有几个更新 state_id 的选项。我假设你正在执行约束current_at < late_at

  • 最简单的是更新每条记录:UPDATE todos SET state_id = CASE WHEN late_at <= NOW() THEN 3 WHEN current_at <= NOW() THEN 2 ELSE 1 END;

  • UPDATE todos SET state_id = 3 WHERE state_id <> 3 AND late_at <= NOW()您可能会通过(在一笔事务中) , UPDATE todos SET state_id = 2 WHERE state_id <> 2 AND NOW() < late_at AND current_at <= NOW(),等方式获得更好的性能UPDATE todos SET state_id = 1 WHERE state_id <> 1 AND NOW() < current_at。这可以避免检索不需要更新的行,但您需要在“late_at”和“future_at”上建立索引(您可以尝试对“state_id”建立索引,请参阅下面的注释)。您可以根据需要频繁运行这三个更新。

  • 上面的细微变化是首先获取记录的 ID,这样您就可以对已更改状态的待办事项执行某些操作。这看起来像SELECT id FROM todos WHERE state_id <> 3 AND late_at <= NOW() FOR UPDATE。然后你应该像这样进行更新UPDATE todos SET state_id = 3 WHERE id IN (:ids)。现在您仍然拥有 ID 供以后执行某些操作(例如,通过电子邮件发送通知“20 个任务已过期”)。

  • 为每个待办事项安排或排队更新作业(例如,在上午 10 点将其更新为“当前”,在晚上 11 点将其更新为“最新”)将导致大量计划作业,至少是待办事项数量的两倍,并且性能较差 - 每个待办事项计划作业仅更新一条记录。

  • 您可以安排批量更新,例如UPDATE state_id = 2 WHERE ID IN (1,2,3,4,5,...)预先计算出待办事项 ID 列表,该列表将在某个特定时间附近变为当前状态。由于多种原因,这在实践中可能不会很好地发挥作用。其中之一是一些待办事项current_atlate_at字段可能会在您安排更新后发生变化。

注意:通过索引“state_id”可能不会获得太多好处,因为它仅将数据集分为三组。对于查询规划器来说,这可能不足以考虑在诸如SELECT * FROM todos WHERE state_id = 1.

您没有讨论的这个问题的关键是已完成的待办事项会发生什么如果您将它们留在此待办事项表中,该表将无限增长,并且您的性能将随着时间的推移而下降。解决方案是将数据分区到两个单独的表中(例如“completed_todos”和“pending_todos”)。然后,您可以UNION在实际需要时连接两个表。