Sidekiq在处理大数据时处理重新排队

Don*_*nia 2 ruby-on-rails heroku idempotent mongodb sidekiq

请参阅下面的更新问题.

原始问题:

在我目前的Rails项目中,我需要解析大型xml/csv数据文件并将其保存到mongodb中.现在我使用这个步骤:

  1. 从用户接收上传的文件,将数据存储到mongodb
  2. 使用sidekiq在mongodb中执行数据的异步处理.
  3. 处理完成后,删除原始数据.

对于localhost中的中小型数据,上述步骤运行良好.但是在heroku中,我使用hirefire来动态地缩放工作人员的dyno.当工人仍然处理大数据时,雇佣工作会看到空队列并缩小工人dyno.这会向进程发送kill信号,并使进程处于不完整状态.

我正在寻找一种更好的解析方法,允许解析过程随时被杀死(在接收kill信号时保存当前状态),并允许进程重新排队.

现在我正在使用Model.delay.parse_file,它不会重新排队.

UPDATE

在阅读了sidekiq wiki之后,我找到了关于工作控制的文章.任何人都可以解释代码,它是如何工作的,以及它在接收SIGTERM信号和工作人员重新排队时如何保持它的状态?

有没有其他方法来处理作业终止,保存当前状态,并从最后一个位置继续?

谢谢,

nor*_*ort 6

可能更容易解释过程和高级步骤,给出一个示例实现(我使用的一个简化版本),然后讨论throw和catch:

  1. 使用递增索引插入原始csv行(以便稍后可以从特定行/索引恢复)
  2. 处理CSV停止每个"块"以检查作业是否完成,方法是检查是否Sidekiq::Fetcher.done?返回true
  3. 当提取器是done?,将当前处理的项目的索引存储在用户上并返回,以便将作业completes和控件返回到sidekiq.
  4. 请注意,如果作业在短暂超时(默认为20秒)后仍在运行,则作业将被终止.
  5. 然后当工作再次运行时,从上次离开的地方开始(或从0开始)

例:

    class UserCSVImportWorker
      include Sidekiq::Worker

      def perform(user_id)
        user = User.find(user_id)

        items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i})
        items.each_with_index do |item, i|
          if (i+1 % 100) == 0 && Sidekiq::Fetcher.done?
            user.update(last_csv_index: item.index)

            return
          end

          # Process the item as normal
        end
      end
    end
Run Code Online (Sandbox Code Playgroud)

上面的类确保我们检查每个100项未完成的提取器(如果已启动关闭的代理),并结束该作业的执行.在执行结束之前index,我们会使用已处理的最后一个更新用户,以便我们可以从下次停止的地方开始.

throw catch是一种实现上述功能的方法,有点清洁(也许)但有点像使用Fibers,很好的概念,但很难包裹你的头.从技术上讲,投掷捕获更像是goto,而不是大多数人通常都习惯的.

编辑

此外,您无法调用Sidekiq::Fetcher.done?并记录last_csv_index每行或每行处理的行,如果您的工作人员在没有机会记录的情况下被杀,last_csv_index您仍然可以继续"关闭"到您停止的位置.