小编Arn*_*sen的帖子

用于处理接收中的异步操作的Akka模式

我有一个接收指标数据点的Actor,并定期聚合并将它们保存到磁盘.后一种操作会进行I/O操作,因此我不想使用阻塞操作.但是,如果我将其切换为异步,如何在聚合完成之前阻止接收其他数据点而不阻塞某处.

我见过的一种模式是使用Stash,如下所示:

class Aggregator extends Actor with Stash {
  def receive = processing

  def processing: Receive = {
    case "aggregate" => {
      context.become(aggregating)
      aggregate().onComplete {
        case Success => self ! "aggregated"
        case Failure => self ! "aggregated"
      }
    }
    case msg => ??? // Process task
  }

  def aggregating: Receive = {
    case "aggregated" =>
      unstashAll()
      context.become(processing)
    case msg =>
      stash()
  }
}
Run Code Online (Sandbox Code Playgroud)

我对此的不满是我的集体行动的完成只是任何人都可以发送的信息.据我了解,我不能在我的未来完成中影响"不合适".

作为旁注,我无法确定完成onComplete是否以某种方式由同一个调度程序执行receive,因为如果它们不是,则完成会破坏演员提供的单线程保护.

或者是否有更好的模式来完成非同步和直接内部的操作,receive同时保证我的状态在完成之前不能被更改?似乎这种情况任何时候演员状态处理任何类型的I/O(如数据库),显然你会想要避免同步I/O,如果可以的话.

io asynchronous scala akka

2
推荐指数
1
解决办法
525
查看次数

scala类型签名允许成员上的数字操作

我有两个基本相同的case类,除了一个有Int和另一个Double成员.我正在尝试创建一个共同的特征,我可以用它来操作任何一个函数并允许基本的数字操作.但到目前为止,我无法弄清楚如何使用Numeric或Ops类型:

trait Sentiment[A] {
  def positive: A
  def negative: A
}

case class IntSentiment(positive: Int, negative: Int) extends Sentiment[Int]
Run Code Online (Sandbox Code Playgroud)

我想提出一个函数约束,以便我可以对成员执行数字操作,类似于:

def effective[A](sentiment: Sentiment[A]): A = {
  sentiment.positive - sentiment.negative
}
Run Code Online (Sandbox Code Playgroud)

这不起作用,我想我需要以Numeric某种方式调用类型类,但我最接近的是:

def effective[A](sentiment: Sentiment[A])(implicit num: Numeric[A]): A = {
  num.minus(sentiment.positive,sentiment.negative)
}
Run Code Online (Sandbox Code Playgroud)

是否有类型约束/签名Sentiment和/或effective我可以定义实际上直接使用+-操作成员?

scala typeclass generic-constraints

2
推荐指数
1
解决办法
302
查看次数

如何使用PartialFunction.applyOrElse

我有一个PartialFuncton[Throwable,Future[Result]]名为errorMap的函数,用于将throwable转换为结果或将来失败.我可以通过做liftgetOrElse这样的:

val x: Future[Result] = errorMap.lift(e).getOrElse(Future.failed(e))
Run Code Online (Sandbox Code Playgroud)

我认为同样应该可以实现applyOrElse,但我似乎无法弄清楚如何调用它来实现这一目标.我误解了什么applyOrElse是为了什么?

scala partialfunction

2
推荐指数
1
解决办法
2081
查看次数

我无法在Scala中找到这一行:s"label($ name)"

我已经在其他答案中寻找了这个,但似乎这个符号$会使搜索引擎跳起来,并且很难找到"那是s做什么".我试过$引用但是我猜我做错了.

无论如何,我在一个类中有这个方法:

class Label(val name: String) {
  override def toString = s"Label($name)"
}
Run Code Online (Sandbox Code Playgroud)

这里有很多混乱.

s引号前的那个是什么?根据我的理解,该类Label在其构造中接收一个String,所以我甚至不确定为什么一个toString方法是必要的.

看起来非常明显,这$name是持有StringLabel收到的名称变量,但为什么$呢?我怀疑它与变量是一个字段有什么关系?本地与全球?但是在Google上寻找这个让我无处可去.有人可以破译这种方法吗?

scala

2
推荐指数
1
解决办法
77
查看次数

您可以从输入数据动态生成 ScalaTest 的测试名称吗?

我有许多通过相同 ScalaTest 单元测试运行的测试数据集。如果每个测试数据集都是它自己的一组命名测试,我会很高兴,所以如果一个数据集在其中一个测试中失败,我确切地知道它是哪一个,而不是去单个测试并查看它失败的文件。我似乎无法找到在运行时生成测试名称的方法。我看过基于属性和表的测试,目前正在使用should behave like共享装置,但这些似乎都没有做我想要的。

我是不是在 ScalaTest 中没有发现正确的测试方法,或者这是不可能的?

scala scalatest

1
推荐指数
2
解决办法
1982
查看次数

Akka:守护异步资源

虽然昨天纠正了我的SRP方式的错误,但我仍然想知道如何干净地保证对akka中的异步资源的单线程访问,例如文件句柄.显然,我不希望允许从不同的线程调度多个读写操作,但如果我的actor在该文件上调用了基于未来的API,那么可能会发生这种情况.

我想出的最好的模式是这样的:

trait AsyncIO {
  def Read(offset: Int, count: Int) : Future[ByteBuffer] = ???
}

object GuardedIOActor {
  case class Read(offset: Int, count: Int)
  case class ReadResult(data: ByteBuffer)
  private case class ReadCompleted()
}

class GuardedIOActor extends Actor with Stash with AsyncIO {
  import GuardedIOActor._
  var caller :Option[ActorRef] = None

  def receive = {
    case Read(offset,count) => caller match {
      case None => {
        caller = Some(sender)
        Read(offset,count).onSuccess({
          case data => {
            self ! ReadCompleted()
            caller.get ! ReadResult(data)
          } …
Run Code Online (Sandbox Code Playgroud)

io asynchronous scala future akka

0
推荐指数
1
解决办法
229
查看次数