Chr*_*sen 6 ruby mutex semaphore redis sidekiq
我有一个rails应用程序,可以从多个IMAP帐户中获取大量电子邮件.
2个问题:
我的代码:
class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable
tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }
def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|
if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end
class ImapJob
include Sidekiq::Worker
def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
if s.lock
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end
end
end
Run Code Online (Sandbox Code Playgroud)
Redis子类并重载blpop以接受-1非阻塞使用lpop.Redis的-旗语通话@redis.blpop在Redis::Semaphore#lock.虽然您可以重载lock使用的方法@redis.lpop,但更简单的方法是将自定义实例传递Redis给信号量.
将以下内容放在lib您的rails应用程序中并在您的需要中config/initializers/sidekiq.rb(或者根据您的喜好进行加载以下类).
class NonBlockingRedis < Redis
def blpop(key, timeout)
if timeout == -1
result = lpop(key)
return result if result.nil?
return [key, result]
else
super(key, timeout)
end
end
end
Run Code Online (Sandbox Code Playgroud)
无论何时调用Redis::Semaphore.new,都要传递一个:redis具有NonBlockingRedis该类新实例的密钥.
调用s.lock与-1作为参数使用lpop,而不是blpop.
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end
Run Code Online (Sandbox Code Playgroud)
sidekiq_options retry: false在您的工人类中使用应该有效,请参阅下面的示例.在您的问题中,您没有指定哪个工作者遇到了在重试队列中结束的作业的问题.自从FetchMailsJobs结束排队ImapJob作业以来,前者中的例外可能会使其看起来ImapJob正在重新排队.
使用信号量锁定,将工作包装在begin rescue ensure块中也是一个好主意.
class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable
sidekiq_options retry: false
tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }
def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|
if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end
class ImapJob
include Sidekiq::Worker
sidekiq_options retry: false
def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock - 1
begin
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
rescue => e
# ignore; do nothing
ensure
s.unlock
end
end
end
end
Run Code Online (Sandbox Code Playgroud)
有关详细信息,请参阅sidekiq高级选项:工作人员.