Sidekiq:确保队列中的所有作业都是唯一的

mah*_*off 11 ruby ruby-on-rails message-queue sidekiq

我有一些更新触发器,将作业推送到Sidekiq队列.因此,在某些情况下,可以有多个作业来处理同一个对象.

有几个独特的插件("中间件",独特的工作),它们没有太多记录,但它们似乎更像是节流器,以防止重复处理 ; 我想要的是一个阻止重复创建相同工作的节流器.这样,对象将始终以最新鲜的状态进行处理.是否有插件或技术?


更新:我没有时间制作中间件,但我最终得到了一个相关的清理功能,以确保队列是唯一的:https://gist.github.com/mahemoff/bf419c568c525f0af903

Pat*_*até 7

简单的客户端中间件怎么样?

module Sidekiq
  class UniqueMiddleware

    def call(worker_class, msg, queue_name, redis_pool)
      if msg["unique"]
        queue = Sidekiq::Queue.new(queue_name)
        queue.each do |job|
          if job.klass == msg['class'] && job.args == msg['args']
            return false
          end
        end
      end

      yield

    end
  end
end
Run Code Online (Sandbox Code Playgroud)

只需注册它

  Sidekiq.configure_client do |config|
    config.client_middleware do |chain|
      chain.add Sidekiq::UniqueMiddleware
    end
  end
Run Code Online (Sandbox Code Playgroud)

然后在你的工作中,只需unique: true在需要时设置sidekiq_options


blo*_*tto 3

我的建议是根据某些选择条件搜索先前安排的作业并删除,然后再安排新的作业。当我想要为特定对象和/或其方法之一执行单个计划作业时,这对我很有用。

在这种情况下的一些示例方法:

 find_jobs_for_object_by_method(klass, method)

  jobs = Sidekiq::ScheduledSet.new

  jobs.select { |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        job_method == method
  }

end

##
# delete job(s) specific to a particular class,method,particular record
# will only remove djs on an object for that method
#
def self.delete_jobs_for_object_by_method(klass, method, id)

  jobs = Sidekiq::ScheduledSet.new
  jobs.select do |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        job_method == method  &&
        args[0] == id
  end.map(&:delete)

end

##
# delete job(s) specific to a particular class and particular record
# will remove any djs on that Object
#
def self.delete_jobs_for_object(klass, id)

  jobs = Sidekiq::ScheduledSet.new
  jobs.select do |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        args[0] == id
  end.map(&:delete)

end
Run Code Online (Sandbox Code Playgroud)