Pil*_*ard 27 ruby queue multithreading synchronization
我有兴趣知道实现基于线程的队列的最佳方法是什么.
例如:
我有10个动作,我想用4个线程执行.我想创建一个队列,其中所有10个动作都是线性放置的,并且用4个线程开始前4个动作,一旦一个线程完成执行,下一个将启动等 - 所以一次,线程的数量是4或小于4.
The*_*heo 30
标准库中有一个Queue
类thread
.使用它你可以做这样的事情:
require 'thread'
queue = Queue.new
threads = []
# add work to the queue
queue << work_unit
4.times do
threads << Thread.new do
# loop until there are no more things to do
until queue.empty?
# pop with the non-blocking flag set, this raises
# an exception if the queue is empty, in which case
# work_unit will be set to nil
work_unit = queue.pop(true) rescue nil
if work_unit
# do work
end
end
# when there is no more work, the thread will stop
end
end
# wait until all threads have completed processing
threads.each { |t| t.join }
Run Code Online (Sandbox Code Playgroud)
我使用非阻塞标志弹出的原因是,在until queue.empty?
pop和另一个线程之间可能已经弹出了队列,所以除非设置了非阻塞标志,否则我们可能会永远卡在该线路上.
如果你正在使用MRI,默认的Ruby解释器,请记住线程不是绝对并发的.如果您的工作受CPU限制,您也可以运行单线程.如果你有一些阻止IO的操作,你可能会得到一些并行性,但是YMMV.或者,您可以使用允许完全并发的解释器,例如jRuby或Rubinius.
有一些宝石可以为你实现这种模式; 平行,桃子和我的叫threach
(或jruby_threach
在jruby下).它是#each的直接替代品,但允许您指定要运行的线程数,使用下面的SizedQueue来防止螺旋式失控.
所以...
(1..10).threach(4) {|i| do_my_work(i) }
Run Code Online (Sandbox Code Playgroud)
不要推我自己的东西; 有很多很好的实现可以让事情变得更容易.
如果您正在使用JRuby,jruby_threach
那么实现起来要好得多 - Java只提供了更丰富的线程原始数据和数据结构.
可执行的描述性示例:
require 'thread'
p tasks = [
{:file => 'task1'},
{:file => 'task2'},
{:file => 'task3'},
{:file => 'task4'},
{:file => 'task5'}
]
tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}
# run workers
workers_count = 3
workers = []
workers_count.times do |n|
workers << Thread.new(n+1) do |my_n|
while (task = tasks_queue.shift(true) rescue nil) do
delay = rand(0)
sleep delay
task[:result] = "done by worker ##{my_n} (in #{delay})"
p task
end
end
end
# wait for all threads
workers.each(&:join)
# output results
puts "all done"
p tasks
Run Code Online (Sandbox Code Playgroud)
您可以使用线程池。对于此类问题,这是相当常见的模式。
http://en.wikipedia.org/wiki/Thread_pool_pattern
Github 似乎有一些你可以尝试的实现:
https://github.com/search ?type=Everything&language=Ruby&q=thread+pool