Ruby中的线程安全枚举器

Eph*_*aim 7 ruby multithreading lazy-loading ruby-on-rails enumerator

TLDR:Ruby中是否存在线程安全版本的Enumerator类?


我正在做的事情:

我在Ruby On Rails应用程序中有一个方法,我想同时运行.该方法应该创建一个包含来自站点的报告的zip文件,其中zip中的每个文件都是PDF.从html到PDF的转换有点慢,因此需要多线程.

我的期望如何:

我想使用5个线程,所以我想我会在线程之间有一个共享的枚举器.每个线程都会从Enumerator中弹出一个值,然后运行do stuff.这就是我认为它会起作用的方式:

t = Zip::OutputStream::write_buffer do |z|
  mutex = Mutex.new
  gen = Enumerator.new{ |g|
    Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).find_each do |report|
      g.yield report
    end
  }
  5.times.map {
    Thread.new do
      begin
        loop do
          mutex.synchronize  do
            @report = gen.next
          end
          title = @report.title + "_" + @report.id.to_s
          title += ".pdf" unless title.end_with?(".pdf")
          pdf = PDFKit.new(render_to_string(:template => partial_url, locals: {array: [@report]},
                                            :layout => false)).to_pdf
          mutex.synchronize  do
            z.put_next_entry(title)
            z.write(pdf)
          end
        end
      rescue StopIteration
        # do nothing
      end
    end
  }.each {|thread| thread.join }
end
Run Code Online (Sandbox Code Playgroud)

我尝试时发生了什么:

当我运行上面的代码时,我收到以下错误:

FiberError at /generate_report
fiber called across threads
Run Code Online (Sandbox Code Playgroud)

经过一些搜索,我发现这篇文章,建议我使用队列而不是枚举器,因为队列是线程安全的,而枚举器不是.虽然这对非Rails应用程序可能是合理的,但这对我来说是不切实际的.

为什么我不能只使用队列:

Rails 4 ActiveRecord的优点是它不会加载查询,直到它们被迭代.而且,如果你使用一种方法find_each来迭代它,它会以1000的批量进行,所以你永远不必一次在ram中存储整个表.我正在使用的查询结果Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]})很大.很大.而且我需要能够动态加载它,而不是像以下那样:

gen = Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).map(&queue.method(:push))
Run Code Online (Sandbox Code Playgroud)

这会将整个查询加载到ram中.

最后问题是:

是否有一种线程安全的方法:

gen = Enumerator.new{ |g|
        Report.all.includes(...).find_each do |report|
          g.yield report
        end
}
Run Code Online (Sandbox Code Playgroud)

这样我就可以从gen多个线程中弹出数据,而不必将我的整个Report(和所有包含)表加载到ram中?

Uri*_*ssi 1

如果您在填充队列之前启动工作线程,它们将在您填充队列时开始消耗队列,并且因为根据经验 - 网络比 CPU 慢,所以每个批次应该(大部分)在队列被消耗时被消耗。下一批到达:

queue = Queue.new

t1 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end
t2 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end

(0..1000).map(&queue.method(:push))

t1.join
t2.join
Run Code Online (Sandbox Code Playgroud)

如果这仍然太慢,您可以选择使用,如果队列达到足够大的大小,SizedQueue它将阻塞:push

queue = SizedQueue.new(100)

t1 = Thread.new do
  while !queue.empty?
    p "#{queue.pop(true)} - #{queue.size}"
    sleep(0.1)
  end
end
t2 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end
(0..300).map(&queue.method(:push))
t1.join
t2.join
Run Code Online (Sandbox Code Playgroud)