tol*_*uju 7 concurrency refactoring scala actor
试图了解如何用演员而不是线程来思考.我对以下用例感到有些困惑:
考虑一个系统,该系统具有创建工作的生产者流程(例如,通过从文件读取数据),以及消耗工作的许多工作流程(例如,通过解析数据并将其写入数据库).工作生产和消费的速度可能不同,系统应保持稳健.例如,如果工人无法跟上,生产者应该检测到这一点并最终减速或等待.
使用线程很容易实现:
val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
new Thread() {
override def run() = {
while (true) {
try {
// take next unit of work, waiting if necessary
val work = queue.take()
process(work)
}
catch {
case e:InterruptedException => return
}
}
}
}
}
// start the workers
workers.foreach(_.start())
while (producer.hasNext) {
val work = producer.next()
// add new unit of work, waiting if necessary
queue.put(work)
}
while (!queue.isEmpty) {
// wait until queue is drained
queue.wait()
}
// stop the workers
workers.foreach(_.interrupt())
Run Code Online (Sandbox Code Playgroud)
这个模型没有什么问题,我以前成功地使用过它.这个例子可能过于冗长,因为使用Executor或CompletionService可以很好地适应这个任务.但我喜欢演员抽象,并认为在许多情况下更容易推理.有没有办法使用actor重写这个例子,特别是确保没有缓冲区溢出(例如完整邮箱,丢弃的消息等)?
因为参与者“离线”处理消息(即消息的消费与消息的接收无关),所以很难看出如何精确模拟“生产者等待消费者赶上”。
我唯一能想到的是消费者向生产者演员请求工作(使用reply
):
case object MoreWorkPlease
class Consumer(prod : Producer) extends Actor {
def act = {
prod ! MoreWorkPlease
loop {
react {
case Work(payload) => doStuff(payload); reply(MoreWorkPlease)
}
}
}
}
class Producer extends Actor {
def act = loop {
react {
case MoreWorkPlease => reply(Work(getNextItem))
}
}
}
Run Code Online (Sandbox Code Playgroud)
当然,这并不完美,因为生产者不会“向前阅读”,只有在消费者准备好时才开始工作。用法如下:
val prod = new Producer
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start())
prod.start()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
513 次 |
最近记录: |