Eph*_*aim 7 ruby multithreading lazy-loading ruby-on-rails enumerator
我在Ruby On Rails应用程序中有一个方法,我想同时运行.该方法应该创建一个包含来自站点的报告的zip文件,其中zip中的每个文件都是PDF.从html到PDF的转换有点慢,因此需要多线程.
我想使用5个线程,所以我想我会在线程之间有一个共享的枚举器.每个线程都会从Enumerator中弹出一个值,然后运行do stuff.这就是我认为它会起作用的方式:
t = Zip::OutputStream::write_buffer do |z|
mutex = Mutex.new
gen = Enumerator.new{ |g|
Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).find_each do |report|
g.yield report
end
}
5.times.map {
Thread.new do
begin
loop do
mutex.synchronize do
@report = gen.next
end
title = @report.title + "_" + @report.id.to_s
title += ".pdf" unless title.end_with?(".pdf")
pdf = PDFKit.new(render_to_string(:template => partial_url, locals: {array: [@report]},
:layout => false)).to_pdf
mutex.synchronize do
z.put_next_entry(title)
z.write(pdf)
end
end
rescue StopIteration
# do nothing
end
end
}.each {|thread| thread.join }
end
Run Code Online (Sandbox Code Playgroud)
当我运行上面的代码时,我收到以下错误:
FiberError at /generate_report
fiber called across threads
Run Code Online (Sandbox Code Playgroud)
经过一些搜索,我发现这篇文章,建议我使用队列而不是枚举器,因为队列是线程安全的,而枚举器不是.虽然这对非Rails应用程序可能是合理的,但这对我来说是不切实际的.
Rails 4 ActiveRecord的优点是它不会加载查询,直到它们被迭代.而且,如果你使用一种方法find_each来迭代它,它会以1000的批量进行,所以你永远不必一次在ram中存储整个表.我正在使用的查询结果Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]})很大.很大.而且我需要能够动态加载它,而不是像以下那样:
gen = Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).map(&queue.method(:push))
Run Code Online (Sandbox Code Playgroud)
这会将整个查询加载到ram中.
是否有一种线程安全的方法:
gen = Enumerator.new{ |g|
Report.all.includes(...).find_each do |report|
g.yield report
end
}
Run Code Online (Sandbox Code Playgroud)
这样我就可以从gen多个线程中弹出数据,而不必将我的整个Report(和所有包含)表加载到ram中?
如果您在填充队列之前启动工作线程,它们将在您填充队列时开始消耗队列,并且因为根据经验 - 网络比 CPU 慢,所以每个批次应该(大部分)在队列被消耗时被消耗。下一批到达:
queue = Queue.new
t1 = Thread.new do
while !queue.empty?
p queue.pop(true)
sleep(0.1)
end
end
t2 = Thread.new do
while !queue.empty?
p queue.pop(true)
sleep(0.1)
end
end
(0..1000).map(&queue.method(:push))
t1.join
t2.join
Run Code Online (Sandbox Code Playgroud)
如果这仍然太慢,您可以选择使用,如果队列达到足够大的大小,SizedQueue它将阻塞:push
queue = SizedQueue.new(100)
t1 = Thread.new do
while !queue.empty?
p "#{queue.pop(true)} - #{queue.size}"
sleep(0.1)
end
end
t2 = Thread.new do
while !queue.empty?
p queue.pop(true)
sleep(0.1)
end
end
(0..300).map(&queue.method(:push))
t1.join
t2.join
Run Code Online (Sandbox Code Playgroud)