使用delayed_job进行轮询

hol*_*den 33 ruby-on-rails backgroundworker push-notification delayed-job ruby-on-rails-3

我有一个过程,通常需要几秒钟才能完成,因此我尝试使用delayed_job来异步处理它.工作本身运作正常,我的问题是如何轮询工作以确定是否已完成.

我可以通过简单地将它分配给变量来获取delayed_job的id:

job = Available.delay.dosomething(:var => 1234)

+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| id   | priority | attempts | handler    | last_error | run_at      | locked_at | failed_at | locked_by | created_at | updated_at  |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| 4037 | 0        | 0        | --- !ru... |            | 2011-04-... |           |           |           | 2011-04... | 2011-04-... |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
Run Code Online (Sandbox Code Playgroud)

但是一旦完成作业,它就会删除它,并且搜索完成的记录会返回错误:

@job=Delayed::Job.find(4037)

ActiveRecord::RecordNotFound: Couldn't find Delayed::Backend::ActiveRecord::Job with ID=4037

@job= Delayed::Job.exists?(params[:id])
Run Code Online (Sandbox Code Playgroud)

我是否需要改变这一点,并推迟删除完整的记录?我不知道我怎么能得到它的状态通知.或者正在查看死记录作为完成证明吗?其他人面对类似的事情?

Max*_*yak 45

让我们从API开始吧.我想要像下面这样的东西.

@available.working? # => true or false, so we know it's running
@available.finished? # => true or false, so we know it's finished (already ran)
Run Code Online (Sandbox Code Playgroud)

现在让我们写下这份工作.

class AwesomeJob < Struct.new(:options)

  def perform
    do_something_with(options[:var])
  end

end
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好.我们有一份工作.现在让我们编写将其排列的逻辑.由于Available是负责这项工作的模型,让我们教它如何开始这项工作.

class Available < ActiveRecord::Base

  def start_working!
    Delayed::Job.enqueue(AwesomeJob.new(options))
  end

  def working?
    # not sure what to put here yet
  end

  def finished?
    # not sure what to put here yet
  end

end
Run Code Online (Sandbox Code Playgroud)

那么我们如何知道这项工作是否有效?有几种方法,但在rails中,我觉得正确的是,当我的模型创建某些东西时,它通常与那些东西相关联.我们如何联想?在数据库中使用id.让我们job_id在可用模型上添加一个.

虽然我们正在努力,但我们怎么知道这项工作因为已经完成而无法工作,或者因为它尚未启动?一种方法是实际检查作业实际上做了什么.如果它创建了一个文件,请检查文件是否存在.如果计算了一个值,请检查结果是否已写入.有些工作并不容易检查,因为他们的工作可能没有明确的可验证结果.对于这种情况,您可以在模型中使用标志或时间戳.假设这是我们的情况,让我们添加一个job_finished_at时间戳来区分尚未运行的作业和已经完成的作业.

class AddJobIdToAvailable < ActiveRecord::Migration
  def self.up
    add_column :available, :job_id, :integer
    add_column :available, :job_finished_at, :datetime
  end

  def self.down
    remove_column :available, :job_id
    remove_column :available, :job_finished_at
  end
end
Run Code Online (Sandbox Code Playgroud)

好的.所以,现在Available,只要我们通过修改start_working!方法排队工作,我们就会立即与其工作联系起来.

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options))
  update_attribute(:job_id, job.id)
end
Run Code Online (Sandbox Code Playgroud)

大.在这一点上,我可以写belongs_to :job,但我们并不真的需要.

所以现在我们知道如何编写working?方法,这很简单.

def working?
  job_id.present?
end
Run Code Online (Sandbox Code Playgroud)

但是我们如何标记完成的工作呢?没有人知道工作比工作本身更好.所以让我们available_id进入工作(作为选项之一)并在工作中使用它.为此,我们需要修改start_working!传递id 的方法.

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options.merge(:available_id => id))
  update_attribute(:job_id, job.id)
end
Run Code Online (Sandbox Code Playgroud)

我们应该将逻辑添加到作业中,以便job_finished_at在完成时更新我们的时间戳.

class AwesomeJob < Struct.new(:options)

  def perform
    available = Available.find(options[:available_id])
    do_something_with(options[:var])

    # Depending on whether you consider an error'ed job to be finished
    # you may want to put this under an ensure. This way the job
    # will be deemed finished even if it error'ed out.
    available.update_attribute(:job_finished_at, Time.current)
  end

end
Run Code Online (Sandbox Code Playgroud)

有了这个代码,我们知道如何编写我们的finished?方法.

def finished?
  job_finished_at.present?
end
Run Code Online (Sandbox Code Playgroud)

我们已经完成了.现在我们可以简单地进行轮询@available.working?,@available.finished?而且,通过检查,您可以方便地知道为您的可用创建了哪个确切的作业@available.job_id.你可以轻松地将它变成一个真正的联想belongs_to :job.


hol*_*den 14

我最终使用了Delayed_Job和after(job)回调的组合,它使用与创建的作业相同的ID填充memcached对象.这样,我最小化了数据库询问作业状态的次数,而不是轮询memcached对象.它包含我完成的作业所需的整个对象,所以我甚至没有往返请求.我从github的一篇文章中得到了这个想法,他们做了几乎相同的事情.

https://github.com/blog/467-smart-js-polling

并使用jquery插件进行轮询,轮询次数较少,并在经过一定次数的重试后放弃

https://github.com/jeremyw/jquery-smart-poll

似乎工作得很好.

 def after(job)
    prices = Room.prices.where("space_id = ? AND bookdate BETWEEN ? AND ?", space_id.to_i, date_from, date_to).to_a
    Rails.cache.fetch(job.id) do
      bed = Bed.new(:space_id => space_id, :date_from => date_from, :date_to => date_to, :prices => prices)
    end
  end
Run Code Online (Sandbox Code Playgroud)


Rom*_*man 13

我认为最好的方法是使用delayed_job中可用的回调.这些是:成功,:错误和:之后.所以你可以在你的模型中添加一些代码:

class ToBeDelayed
  def perform
    # do something
  end

  def after(job)
    # do something
  end
end
Run Code Online (Sandbox Code Playgroud)

因为如果你坚持使用obj.delayed.method,那么你将不得不修补Delayed :: PerformableMethod并在after那里添加方法.恕我直言,它远比轮询某些可能甚至特定于后端的值更好(例如,ActiveRecord与Mongoid).


Mik*_*cic 5

实现此目的的最简单方法是将您的轮询操作更改为类似于以下内容:

def poll
  @job = Delayed::Job.find_by_id(params[:job_id])

  if @job.nil?
    # The job has completed and is no longer in the database.
  else
    if @job.last_error.nil?
      # The job is still in the queue and has not been run.
    else
      # The job has encountered an error.
    end
  end
end
Run Code Online (Sandbox Code Playgroud)

为什么这样做?当Delayed::Job运行从队列中的作业,它从数据库中删除它如果成功.如果作业失败,则记录将保留在队列中以便稍后再次运行,并且该last_error属性将设置为遇到的错误.使用上面的两个功能,您可以检查已删除的记录以查看它们是否成功.

上述方法的好处是:

  • 您将获得原始帖子中要查找的轮询效果
  • 使用简单的逻辑分支,如果处理作业时出错,您可以向用户提供反馈

您可以通过执行以下操作将此功能封装在模型方法中:

# Include this in your initializers somewhere
class Queue < Delayed::Job
  def self.status(id)
    self.find_by_id(id).nil? ? "success" : (job.last_error.nil? ? "queued" : "failure")
  end
end

# Use this method in your poll method like so:
def poll
    status = Queue.status(params[:id])
    if status == "success"
      # Success, notify the user!
    elsif status == "failure"
      # Failure, notify the user!
    end
end
Run Code Online (Sandbox Code Playgroud)