Akka:守护异步资源

Arn*_*sen 0 io asynchronous scala future akka

虽然昨天纠正了我的SRP方式的错误,但我仍然想知道如何干净地保证对akka中的异步资源的单线程访问,例如文件句柄.显然,我不希望允许从不同的线程调度多个读写操作,但如果我的actor在该文件上调用了基于未来的API,那么可能会发生这种情况.

我想出的最好的模式是这样的:

trait AsyncIO {
  def Read(offset: Int, count: Int) : Future[ByteBuffer] = ???
}

object GuardedIOActor {
  case class Read(offset: Int, count: Int)
  case class ReadResult(data: ByteBuffer)
  private case class ReadCompleted()
}

class GuardedIOActor extends Actor with Stash with AsyncIO {
  import GuardedIOActor._
  var caller :Option[ActorRef] = None

  def receive = {
    case Read(offset,count) => caller match {
      case None => {
        caller = Some(sender)
        Read(offset,count).onSuccess({
          case data => {
            self ! ReadCompleted()
            caller.get ! ReadResult(data)
          }
        })
      }
      case Some(_) => {
        stash()
      }
    }
    case ReadCompleted() => {
      caller = None
      unstashAll()
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

但是这个要求对我来说不够深奥,不足以让我推出那种kludge.我的意思是应该有大量资源需要同步访问,但有一个异步API.我是否会忽略一些常见的命名模式?

Dan*_*mon 6

我认为你的解决方案的要点并不是那么糟糕,但你可以通过使用context.become以下方式让你的演员表现得更像状态机:

class GaurdedIOActor extends Actor with Stash with AsyncIO {
  import GuardedIOActor._

  def receive = notReading

  def notReading: Receive = {
    case Read(offset, count) => {
      val caller = sender
      Read(offset,count).onSuccess({
        case data => {
          self ! ReadCompleted()
          caller ! ReadResult(data)
        }
      })
      context.become(reading)
    }
  }

  def reading: Receive = {
    case r: Read => stash()
    case ReadCompleted() => {
      context.become(notReading)
      unstashAll()
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

现在你的演员有两个定义明确的状态,没有必要 var