Don*_*nia 2 ruby-on-rails heroku idempotent mongodb sidekiq
请参阅下面的更新问题.
原始问题:
在我目前的Rails项目中,我需要解析大型xml/csv数据文件并将其保存到mongodb中.现在我使用这个步骤:
对于localhost中的中小型数据,上述步骤运行良好.但是在heroku中,我使用hirefire来动态地缩放工作人员的dyno.当工人仍然处理大数据时,雇佣工作会看到空队列并缩小工人dyno.这会向进程发送kill信号,并使进程处于不完整状态.
我正在寻找一种更好的解析方法,允许解析过程随时被杀死(在接收kill信号时保存当前状态),并允许进程重新排队.
现在我正在使用Model.delay.parse_file,它不会重新排队.
UPDATE
在阅读了sidekiq wiki之后,我找到了关于工作控制的文章.任何人都可以解释代码,它是如何工作的,以及它在接收SIGTERM信号和工作人员重新排队时如何保持它的状态?
有没有其他方法来处理作业终止,保存当前状态,并从最后一个位置继续?
谢谢,
可能更容易解释过程和高级步骤,给出一个示例实现(我使用的一个简化版本),然后讨论throw和catch:
Sidekiq::Fetcher.done?返回truedone?,将当前处理的项目的索引存储在用户上并返回,以便将作业completes和控件返回到sidekiq.例:
class UserCSVImportWorker
include Sidekiq::Worker
def perform(user_id)
user = User.find(user_id)
items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i})
items.each_with_index do |item, i|
if (i+1 % 100) == 0 && Sidekiq::Fetcher.done?
user.update(last_csv_index: item.index)
return
end
# Process the item as normal
end
end
end
Run Code Online (Sandbox Code Playgroud)
上面的类确保我们检查每个100项未完成的提取器(如果已启动关闭的代理),并结束该作业的执行.在执行结束之前index,我们会使用已处理的最后一个更新用户,以便我们可以从下次停止的地方开始.
throw catch是一种实现上述功能的方法,有点清洁(也许)但有点像使用Fibers,很好的概念,但很难包裹你的头.从技术上讲,投掷捕获更像是goto,而不是大多数人通常都习惯的.
编辑
此外,您无法调用Sidekiq::Fetcher.done?并记录last_csv_index每行或每行处理的行,如果您的工作人员在没有机会记录的情况下被杀,last_csv_index您仍然可以继续"关闭"到您停止的位置.
| 归档时间: |
|
| 查看次数: |
1085 次 |
| 最近记录: |