如何最好地保持作业队列清理重试/重复作业(使用sidekiq和redis-semaphore)

Chr*_*sen 6 ruby mutex semaphore redis sidekiq

我有一个rails应用程序,可以从多个IMAP帐户中获取大量电子邮件.

  • 我用sidekiq来处理这些工作.
  • 我使用sidetiq安排工作.
  • 我使用redis-semaphore来确保同一用户的重复工作不会相互偶然发生.

2个问题:

  • 1:当一个工作命中"if s.lock"时,redis-semaphore将其暂停,直到所有先前的工作完成.我需要取消工作而不是排队.
  • 2:如果在作业期间引发异常,导致崩溃,则sidekiq会将作业重新放入队列进行重试.我需要取消工作而不是排队.将"sidekiq_options:retry => false"放入代码似乎没有什么区别.

我的代码:

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
    if s.lock
      user = User.where(_id: user_id).first
      emails = ImapMails.receive_mails(user)
      s.unlock
    end
  end
end
Run Code Online (Sandbox Code Playgroud)

pot*_*lad 6

1.创建一个Redis子类并重载blpop以接受-1非阻塞使用lpop.

Redis的-旗语通话@redis.blpopRedis::Semaphore#lock.虽然您可以重载lock使用的方法@redis.lpop,但更简单的方法是将自定义实例传递Redis给信号量.

将以下内容放在lib您的rails应用程序中并在您的需要中config/initializers/sidekiq.rb(或者根据您的喜好进行加载以下类).

class NonBlockingRedis < Redis
  def blpop(key, timeout)
    if timeout == -1
      result = lpop(key)
      return result if result.nil?
      return [key, result]
    else
      super(key, timeout)
    end
  end
end
Run Code Online (Sandbox Code Playgroud)

无论何时调用Redis::Semaphore.new,都要传递一个:redis具有NonBlockingRedis该类新实例的密钥.

调用s.lock-1作为参数使用lpop,而不是blpop.

s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
  user = User.where(_id: user_id).first
  emails = ImapMails.receive_mails(user)
  s.unlock
end
Run Code Online (Sandbox Code Playgroud)

2. sidekiq_options retry: false在您的工人类中使用应该有效,请参阅下面的示例.

在您的问题中,您没有指定哪个工作者遇到了在重试队列中结束的作业的问题.自从FetchMailsJobs结束排队ImapJob作业以来,前者中的例外可能会使其看起来ImapJob正在重新排队.

使用信号量锁定,将工作包装在begin rescue ensure块中也是一个好主意.

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  sidekiq_options retry: false

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  sidekiq_options retry: false

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
    if s.lock - 1
      begin
        user = User.where(_id: user_id).first
        emails = ImapMails.receive_mails(user)
      rescue => e
        # ignore; do nothing
      ensure
        s.unlock
      end
    end
  end
end
Run Code Online (Sandbox Code Playgroud)

有关详细信息,请参阅sidekiq高级选项:工作人员.