线程和队列

Pil*_*ard 27 ruby queue multithreading synchronization

我有兴趣知道实现基于线程的队列的最佳方法是什么.

例如:

我有10个动作,我想用4个线程执行.我想创建一个队列,其中所有10个动作都是线性放置的,并且用4个线程开始前4个动作,一旦一个线程完成执行,下一个将启动等 - 所以一次,线程的数量是4或小于4.

The*_*heo 30

标准库中有一个Queuethread.使用它你可以做这样的事情:

require 'thread'

queue = Queue.new
threads = []

# add work to the queue
queue << work_unit

4.times do
  threads << Thread.new do
    # loop until there are no more things to do
    until queue.empty?
      # pop with the non-blocking flag set, this raises
      # an exception if the queue is empty, in which case
      # work_unit will be set to nil
      work_unit = queue.pop(true) rescue nil
      if work_unit
        # do work
      end
    end
    # when there is no more work, the thread will stop
  end
end

# wait until all threads have completed processing
threads.each { |t| t.join }
Run Code Online (Sandbox Code Playgroud)

我使用非阻塞标志弹出的原因是,在until queue.empty?pop和另一个线程之间可能已经弹出了队列,所以除非设置了非阻塞标志,否则我们可能会永远卡在该线路上.

如果你正在使用MRI,默认的Ruby解释器,请记住线程不是绝对并发的.如果您的工作受CPU限制,您也可以运行单线程.如果你有一些阻止IO的操作,你可能会得到一些并行性,但是YMMV.或者,您可以使用允许完全并发的解释器,例如jRuby或Rubinius.


Bil*_*ber 7

有一些宝石可以为你实现这种模式; 平行,桃子和我的叫threach(或jruby_threach在jruby下).它是#each的直接替代品,但允许您指定要运行的线程数,使用下面的SizedQueue来防止螺旋式失控.

所以...

(1..10).threach(4) {|i| do_my_work(i) }
Run Code Online (Sandbox Code Playgroud)

不要推我自己的东西; 有很多很好的实现可以让事情变得更容易.

如果您正在使用JRuby,jruby_threach那么实现起来要好得多 - Java只提供了更丰富的线程原始数据和数据结构.


Inv*_*ion 5

可执行的描述性示例:

require 'thread'

p tasks = [
    {:file => 'task1'},
    {:file => 'task2'},
    {:file => 'task3'},
    {:file => 'task4'},
    {:file => 'task5'}
]

tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}

# run workers
workers_count = 3
workers = []
workers_count.times do |n|
    workers << Thread.new(n+1) do |my_n|
        while (task = tasks_queue.shift(true) rescue nil) do
            delay = rand(0)
            sleep delay
            task[:result] = "done by worker ##{my_n} (in #{delay})"
            p task
        end
    end
end

# wait for all threads
workers.each(&:join)

# output results
puts "all done"
p tasks
Run Code Online (Sandbox Code Playgroud)


Cas*_*per 4

您可以使用线程池。对于此类问题,这是相当常见的模式。
http://en.wikipedia.org/wiki/Thread_pool_pattern

Github 似乎有一些你可以尝试的实现:
https://github.com/search ?type=Everything&language=Ruby&q=thread+pool