阻止Akka Actors中的呼叫

Ays*_*gma 30 concurrency multithreading scala actor akka

作为一个新手,我试图了解演员的工作方式.而且,从文档中,我认为我理解actor是以同步模式执行的对象,并且actor执行可以包含阻塞/同步方法调用,例如db请求

但是,我不明白的是,如果你编写一个内部有一些阻塞调用的actor(比如阻塞查询执行),它会搞乱整个线程池(从某种意义上说cpu利用率会下降等等) ), 对 ?我的意思是,根据我的理解,如果/当演员进行阻止调用时,JVM无法理解它是否可以将该线程切换到其他人.

因此,考虑到并发的性质,演员不应该做任何阻塞调用吗?

如果是这种情况,那么建议的非阻塞/异步调用方法是什么,让我们说一个Web服务调用可以获取内容并在该请求完成时向另一个actor发送消息?我们应该只使用演员内部的东西:

未来地图{response => x!响应

这是处理这个问题的正确方法吗?

如果你能为我澄清这一点,我将不胜感激.

dre*_*xin 16

这实际上取决于用例.如果查询不需要序列化,那么您可以在将来执行查询并将结果发送回发件人,如下所示:

import scala.concurrent.{ future, blocking}
import akka.pattern.pipe

val resFut = future {
  blocking {
    executeQuery()
  }
}

resFut pipeTo sender
Run Code Online (Sandbox Code Playgroud)

您还可以专门为DB调用创建专用调度程序,并使用路由器创建actor.这样,您还可以轻松限制并发数据库请求的数量.

  • 使用`blocking`在代码周围允许运行时调整它的行为并从池中删除阻塞线程以允许池启动新线程,从而防止池耗尽. (5认同)

ya_*_*ser 15

真的很棒的介绍"新手的Scala指南第14部分:并发的演员方法" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the- actor-approach-to-concurrency.html.

Actor接收消息,将阻塞代码包装到未来,在它的Future.onSuccess方法中 - 使用其他异步消息发送结果.但请注意发送方变量可能会发生变化,因此请将其关闭(在将来的对象中进行本地引用).

ps:新手的斯卡拉指南 - 真是太棒了.

更新:(添加示例代码)

我们有工人和经理.经理设定要完成的工作,工人报告"得到它"并开始长时间的过程(睡眠1000).同时系统ping经理,消息"活着",经理用工作人员打电话给他们.完成工作后 - 工人会通知经理.

注意:在导入的"默认/全局"线程池执行程序中执行sleep 1000 - 您可以获得线程饥饿.注意:val commander = sender需要"关闭"对原始发件人的引用,因为当onSuccess将被执行时 - actor中的当前发件人可能已经设置为其他'发件人'......

日志:

01:35:12:632 Humming ...
01:35:12:633 manager: flush sent
01:35:12:633 worker: got command
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:660 worker: started
01:35:12:662 worker: alive
01:35:12:662 manager: resource allocated
01:35:12:662 worker: alive
01:35:12:662 worker: alive
01:35:13:661 worker: done
01:35:13:663 manager: work is done
01:35:17:633 Shutdown!
Run Code Online (Sandbox Code Playgroud)

码:

import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import com.typesafe.config.ConfigFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.concurrent._
import ExecutionContext.Implicits.global

object Sample {

  private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")

  def printWithTime(msg: String) = {
    println(fmt.format(new Date()) + " " + msg)
  }

  class WorkerActor extends Actor {
    protected def receive = {
      case "now" =>
        val commander = sender
        printWithTime("worker: got command")
        future {
          printWithTime("worker: started")
          Thread.sleep(1000)
          printWithTime("worker: done")
        }(ExecutionContext.Implicits.global) onSuccess {
          // here commander = original sender who requested the start of the future
          case _ => commander ! "done" 
        }
        commander ! "working"
      case "alive?" =>
        printWithTime("worker: alive")
    }
  }

  class ManagerActor(worker: ActorRef) extends Actor {
    protected def receive = {
      case "do" =>
        worker ! "now"
        printWithTime("manager: flush sent")
      case "working" =>
        printWithTime("manager: resource allocated")
      case "done" =>
        printWithTime("manager: work is done")
      case "alive?" =>
        printWithTime("manager alive")
        worker ! "alive?"
    }
  }

  def main(args: Array[String]) {

    val config = ConfigFactory.parseString("" +
      "akka.loglevel=DEBUG\n" +
      "akka.debug.lifecycle=on\n" +
      "akka.debug.receive=on\n" +
      "akka.debug.event-stream=on\n" +
      "akka.debug.unhandled=on\n" +
      ""
    )

    val system = ActorSystem("mine", config)
    val actor1 = system.actorOf(Props[WorkerActor], "worker")
    val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")

    actor2 ! "do"
    actor2 ! "alive?"
    actor2 ! "alive?"
    actor2 ! "alive?"

    printWithTime("Humming ...")
    Thread.sleep(5000)
    printWithTime("Shutdown!")
    system.shutdown()

  }
}
Run Code Online (Sandbox Code Playgroud)