如何为 ActiveRecord 模型对象查找关联的 Resque 作业?

Str*_*ine 2 ruby ruby-on-rails resque redis

我需要能够找到模型对象的排队和/或工作作业和/或失败的作业,例如,当模型对象被销毁时,我们想要找到所有并决定不删除或销毁作业(有条件地) .

在我重新发明轮子之前,是否有推荐的方法来做到这一点?

例子:

如果您想创建一个before_destroy回调,当对象被销毁(排队和失败的作业)时销毁所有作业,并且仅在没有工作作业时销毁

我想为这个示例用例做的一些伪代码:

报表模型

class Report < ActiveRecord::Base
  before_destroy :check_if_working_jobs, :destroy_queued_and_failed_jobs

  def check_if_working_jobs
    # find all working jobs related to this report object
    working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id) 
    return false unless working_jobs.empty?
  end

  def destroy_queued_and_failed_jobs
    # find all jobs related to this report object
    queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id) 
    failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id) 

    # destroy/remove all jobs found
    (queued_jobs + failed_jobs).each do |job| 
       # destroy the job here ... commands?
    end

  end
end
Run Code Online (Sandbox Code Playgroud)

用于 resque / redis 支持的作业的报告处理工作类

class ProcessReportWorker

  # find the jobs by report id which is one of the arguments for the job?
  # envisioned as separate methods so they can be used independently as needed

  def self.find_queued_jobs_by_report_id(id)
     # parse all jobs in all queues to find based on the report id argument?
  end 

  def self.find_working_jobs_by_report_id(id)
     # parse all jobs in working queues to find based on the report id argument? 
  end 

  def self.find_failed_jobs_by_report_id(id)
     # parse all jobs in failed queue to find based on the report id argument? 
  end 
end
Run Code Online (Sandbox Code Playgroud)

这种方法是否符合需要发生的情况?

上面缺少哪些部分可以通过模型对象 id 查找排队或正在工作的作业,然后将其销毁?

是否已经有方法可以通过我在文档或搜索中遗漏的关联模型对象 ID 来查找和/或销毁?

更新:修改了使用示例,仅使用working_jobs 作为检查我们是否应该删除与建议我们也将尝试删除working_jobs 的方法。(因为删除工作作业比简单地删除 redis 键条目更复杂)

Str*_*ine 6

这里很安静,没有任何回应,所以我设法花一天时间按照我在问题中指出的路径来解决这个问题。可能有更好的解决方案或其他可用方法,但到目前为止,这似乎可以完成工作。如果这里使用的方法有更好的选择,或者可以进一步改进,请随时发表评论。

这里的总体方法是您需要搜索所有作业(排队、正在工作、失败),并仅过滤出classqueue您在 args 数组的正确索引位置中查找的相关且与您正在寻找的对象记录 ID 匹配的作业. 例如(在确认classqueue匹配之后)如果参数位置0是对象id所在的位置,那么您可以测试以查看是否args[0]匹配对象id。

本质上,如果满足以下条件,则作业与对象 ID 相关联: job_class == class.name && job_queue == @queue && job_args[OBJECT_ID_ARGS_INDEX].to_i == object_id

  • 排队作业:要查找所有排队作业,您需要收集所有带有键名的 redis 条目,queue:#{@queue}其中 @queue 是您的工作类正在使用的队列的名称。如果您为特定的工作类使用多个队列,则通过循环访问多个队列进行相应的修改。Resque.redis.lrange("queue:#{@queue}",0,-1)
  • 失败的作业:要查找所有排队的作业,您需要收集所有带有命名键的 redis 条目failed(除非您使用多个失败队列或其他非默认设置)。Resque.redis.lrange("failed",0,-1)
  • 工作工作:要查找您可以使用的所有工作工作,Resque.workers 其中包含所有工作人员和正在运行的工作的数组。Resque.workers.map(&:job)
  • 作业:上述每个列表中的每个作业都将是一个编码散列。您可以使用 将作业解码为 ruby​​ 哈希Resque.decode(job)
  • 类和参数:对于排队作业classargs键是job["class"]job["args"]。对于失败的工作的工作,这些是job["payload"]["class"]job["payload"]["args"]
  • 队列:对于找到的每个失败工作的作业,队列将为job["queue"]。在测试对象 id 的 args 列表之前,您只需要与class和匹配的作业queue。您的排队作业列表将仅限于您收集的队列。

以下是用于查找(和删除)与示例模型对象(报告)相关联的作业的示例工作类和模型方法。

用于 resque / redis 支持的作业的报告处理工作类

class ProcessReportWorker
  # queue name
  @queue = :report_processing
  # tell the worker class where the report id is in the arguments list
  REPORT_ID_ARGS_INDEX = 0 

  # <snip> rest of class, not needed here for this answer

  # find jobs methods - find by report id (report is the 'associated' object)

  def self.find_queued_jobs_by_report_id report_id
    queued_jobs(@queue).select do |job|
      is_job_for_report? :queued, job, report_id
    end
  end

  def self.find_failed_jobs_by_report_id report_id
    failed_jobs.select do |job|
      is_job_for_report? :failed, job, report_id
    end
  end

  def self.find_working_jobs_by_report_id report_id
    working_jobs.select do |worker,job|
      is_job_for_report? :working, job, report_id
    end
  end

  # association test method - determine if this job is associated      

  def self.is_job_for_report? state, job, report_id
    attributes = job_attributes(state, job)
    attributes[:klass] == self.name && 
      attributes[:queue] == @queue && 
        attributes[:args][REPORT_ID_ARGS_INDEX].to_i == report_id
  end

  # remove jobs methods

  def self.remove_failed_jobs_by_report_id report_id
    find_failed_jobs_by_report_id(report_id).each do |job|
      Resque::Failure.remove(job["index"]) 
    end
  end

  def self.remove_queued_jobs_by_report_id report_id
    find_queued_jobs_by_report_id(report_id).each do |job|
      Resque::Job.destroy(@queue,job["class"],*job["args"])
    end
  end

  # reusable methods - these methods could go elsewhere and be reusable across worker classes

  # job attributes method

  def self.job_attributes(state, job)
    if state == :queued && job["args"].present?
      args = job["args"]
      klass = job["class"]
    elsif job["payload"] && job["payload"]["args"].present?
      args = job["payload"]["args"]
      klass = job["payload"]["class"] 
    else
      return {args: nil, klass: nil, queue: nil}
    end
    {args: args, klass: klass, queue: job["queue"]}
  end

  # jobs list methods

  def self.queued_jobs queue
    Resque.redis.lrange("queue:#{queue}", 0, -1)
      .collect do |job| 
        job = Resque.decode(job)
        job["queue"] = queue # for consistency only
        job
      end
  end

  def self.failed_jobs
    Resque.redis.lrange("failed", 0, -1)
      .each_with_index.collect do |job,index|
        job = Resque.decode(job)
        job["index"] = index # required if removing
        job
      end
  end

  def self.working_jobs
    Resque.workers.zip(Resque.workers.map(&:job))
      .reject { |w, j| w.idle? || j['queue'].nil? }
  end

end
Run Code Online (Sandbox Code Playgroud)

那么报表模型的使用示例就变成了

class Report < ActiveRecord::Base
  before_destroy :check_if_working_jobs, :remove_queued_and_failed_jobs

  def check_if_working_jobs
    # find all working jobs related to this report object
    working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id) 
    return false unless working_jobs.empty?
  end

  def remove_queued_and_failed_jobs
    # find all jobs related to this report object
    queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id) 
    failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id) 

    # extra code and conditionals here for example only as all that is really 
    # needed is to call the remove methods without first finding or checking

    unless queued_jobs.empty?
      ProcessReportWorker.remove_queued_jobs_by_report_id(self.id)
    end

    unless failed_jobs.empty?
      ProcessReportWorker.remove_failed_jobs_by_report_id(self.id)
    end

  end
end
Run Code Online (Sandbox Code Playgroud)

如果为worker类使用多个队列或者有多个失败队列,则需要修改解决方案。此外,redis还使用了故障后端。如果使用不同的故障后端,则可能需要进行更改。