如何从 ruby​​ 中实时创建的新线程异步收集结果

mes*_*600 1 ruby multithreading process thread-safety spawn

我想不断检查数据库中的表以获取要运行的命令。某些命令可能需要 4 分钟才能完成,大约需要 10 秒。

因此我想在线程中运行它们。所以每条记录都会创建新的线程,并且在创建线程后,记录被删除。

因为数据库查找 + 线程创建将在无限循环中运行,我如何从线程获取“响应”(线程将发出 shell 命令并获取我想阅读的响应代码)?

我想过创建两个无限循环的线程: - 第一个用于数据库查找 + 创建新线程 - 第二个用于......以某种方式读取线程结果并对每个响应采取行动

或者也许我应该使用 fork,或者 os 产生一个新进程?

7st*_*tud 5

您可以让每个线程将其结果推送到队列中,然后您的主线程可以从队列中读取。默认情况下,从队列中读取是一个阻塞操作,因此如果没有结果,您的代码将阻塞并等待读取。

http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html

下面是一个例子:

require 'thread'

jobs = Queue.new
results = Queue.new

thread_pool = []
pool_size = 5

(1..pool_size).each do |i|
  thread_pool << Thread.new do 
    loop do 
      job = jobs.shift #blocks waiting for a task
      break if job == "!NO-MORE-JOBS!"

      #Otherwise, do job...
      puts "#{i}...."
      sleep rand(1..5) #Simulate the time it takes to do a job
      results << "thread#{i} finished #{job}"  #Push some result from the job onto the Queue
      #Go back and get another task from the Queue
    end
  end
end


#All threads are now blocking waiting for a job...
puts 'db_stuff'
db_stuff = [
  'job1', 
  'job2', 
  'job3', 
  'job4', 
  'job5',
  'job6',
  'job7',
]

db_stuff.each do |job|
  jobs << job
end

#Threads are now attacking the Queue like hungry dogs.

pool_size.times do
  jobs << "!NO-MORE-JOBS!"
end

result_count = 0

loop do
  result = results.shift
  puts "result: #{result}"
  result_count +=1
  break if result_count == 7
end
Run Code Online (Sandbox Code Playgroud)