Ruby同步:如何使线程以适当的顺序一个接一个地工作?

Rom*_*ian 3 ruby multithreading synchronization mutex monitor

我的问题是我不知道如何使用Ruby同步多个线程。该任务是创建六个线程并立即启动它们。他们都应该puts "Thread 1" Hi"按我需要的顺序依次进行一些工作(例如)。

我已经尝试过使用Mutex,Monitor和Condition Variable,但是它们都以随机顺序工作。有人可以解释如何实现我的目标吗?

经过一段时间与Mutex和Condition Variable的斗争后,我实现了我的目标。这段代码有点混乱,我故意不使用循环来获得“更清晰的视图”。

cv = ConditionVariable.new
mutex = Mutex.new

mutex2 = Mutex.new
cv2 = ConditionVariable.new

mutex3 = Mutex.new
cv3 = ConditionVariable.new

mutex4 = Mutex.new
cv4 = ConditionVariable.new

mutex5 = Mutex.new
cv5 = ConditionVariable.new

mutex6 = Mutex.new
cv6 = ConditionVariable.new



Thread.new do
  mutex.synchronize {
    puts 'First: Hi'
    cv.wait(mutex)
    puts 'First: Bye'
    #cv.wait(mutex)
    cv.signal
    puts 'First: One more time'
  }

end

Thread.new do
  mutex.synchronize {
    puts 'Second: Hi'
    cv.signal
    cv.wait(mutex)
    puts 'Second:Bye'
    cv.signal
  }

  mutex2.synchronize {
    puts  'Second: Starting third'
    cv2.signal

  }
end

Thread.new do
  mutex2.synchronize {
    cv2.wait(mutex2)
    puts 'Third: Hi'
  }

  mutex3.synchronize {
    puts 'Third: Starting forth'
    cv3.signal
  }
end

Thread.new do
  mutex3.synchronize {
    cv3.wait(mutex3)
    puts 'Forth: Hi'
  }

  mutex4.synchronize {
    puts 'Forth: Starting fifth'
    cv4.signal
  }
end

Thread.new do
  mutex4.synchronize {
    cv4.wait(mutex4)
    puts 'Fifth: Hi'
  }

  mutex5.synchronize {
    puts 'Fifth: Starting sixth'
    cv5.signal
  }
end

Thread.new {
  mutex5.synchronize {
    cv5.wait(mutex5)
    puts 'Sixth:Hi'
  }
}

sleep 2
Run Code Online (Sandbox Code Playgroud)

Way*_*rad 5

使用队列作为PV信号灯

您可以滥用Queue,将其像传统的PV信号灯一样使用。为此,您创建一个Queue实例:

require 'thread'
...
sem = Queue.new
Run Code Online (Sandbox Code Playgroud)

当线程需要等待时,它将调用Queue#deq

# waiting thread
sem.deq
Run Code Online (Sandbox Code Playgroud)

当其他某个线程想要解除阻塞正在等待的线程时,它将某些东西(任何东西)推送到队列中:

# another thread that wants to unblock the waiting thread
sem.enq :go
Run Code Online (Sandbox Code Playgroud)

工人阶级

这是一个使用Queue同步其开始和停止的工作程序类:

class Worker

  def initialize(worker_number)  
    @start = Queue.new
    Thread.new do
      @start.deq
      puts "Thread #{worker_number}"
      @when_done.call
    end
  end

  def start
    @start.enq :start
  end

  def when_done(&block)
    @when_done = block
  end

end
Run Code Online (Sandbox Code Playgroud)

构造后,工作程序会创建一个线程,但是该线程然后在@start队列中等待。直到调用#start,线程才会解除阻塞。

完成后,线程将执行已调用#when_done的块。我们将在短时间内看到如何使用它。

创造工人

首先,让我们确保如果有任何线程引发异常,我们将对其进行了解:

Thread.abort_on_exception = true
Run Code Online (Sandbox Code Playgroud)

我们将需要六名工人:

workers = (1..6).map { |i| Worker.new(i) }
Run Code Online (Sandbox Code Playgroud)

告诉每个工人完成后该怎么做

这是#when_done起作用的地方:

workers.each_cons(2) do |w1, w2|
  w1.when_done { w2.start }
end
Run Code Online (Sandbox Code Playgroud)

这轮流带动每对工人。除最后一个工人外,每个工人被告知,完成时应在其之后启动该工人。剩下的只有最后一个工人。完成后,我们希望它通知该线程:

all_done = Queue.new
workers.last.when_done { all_done.enq :done }
Run Code Online (Sandbox Code Playgroud)

我们走吧!

现在剩下的就是启动第一个线程:

workers.first.start
Run Code Online (Sandbox Code Playgroud)

并等待最后一个线程完成:

all_done.deq
Run Code Online (Sandbox Code Playgroud)

输出:

Thread 1
Thread 2
Thread 3
Thread 4
Thread 5
Thread 6
Run Code Online (Sandbox Code Playgroud)