当演员状态只增加时,我可以使用akka持久性吗?

use*_*095 5 event-sourcing akka-persistence

我正在玩akka持久性试图实现一个服务,其中我的状态可能非常大(好吧,它说它不适合RAM)一些实体的列表.让我们说用户希望所有实体的所有历史记录都可用.我可以在akka持久性中做到这一点吗?

现在我的演员状态看起来像那样.

case class System(var processes: Map[Long, Process] = Map()) {

  def updated(event: Event): System = event match {
    case ProcessDetectedEvent(time, activitySets, id, processType) =>
      val process = Process(activitySets.coordinates, time, activitySets.channels, id, processType, false)
      copy(processes = processes + (id -> process))

    case ProcessMovedEvent(id, activitySets, time) =>
      val process = Process(activitySets.coordinates, time, activitySets.channels, id, processes(id).processType, false)
      copy(processes = processes + (id -> process))

    case ProcessClosedEvent(time, id) =>
      val currentProcess = processes(id)
      val process = Process(currentProcess.coordinates, time, currentProcess.channels, id, currentProcess.processType, true)
      copy(processes = processes + (id -> process))
    case _ => this
  }

}
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,进程映射存储在内存中,因此如果进程数量很大,应用程序可能会耗尽内存.

Sre*_*har 0

有状态 Actor 使用 Akka 持久性来在 Actor 启动、JVM 崩溃后或由 Supervisor 重新启动或在集群中迁移时恢复其内部状态。在这种情况下,从长远来看,应用程序/JVM 可能会因 OutOfMemory 异常而崩溃。当这个参与者重新启动时,持久性恢复机制将在映射中重新创建进程的所有信息。但总内存又会很高,应用程序在运行时可能会再次崩溃。因此,在这种情况下,持久性无助于避免应用程序崩溃,除非您仅保留部分进程列表以减少内存。

所以首先你需要想办法解决这个内存异常。也许您可以尝试以下选项。

  1. 发生 OutOfMemory 异常后,在 JVM 重新启动期间尝试增加 JVM 堆大小。
  2. 恢复状态时,仅重播选定的消息列表,以便使用的总内存较低,但状态不完整。

如果恢复期间要重放的消息列表太大,可以使用快照来减少状态恢复时间。

  • 我认为我的问题不够清楚。我不能有不完整的状态,这就是问题的重点。我已经找到了一种解决方案 - Actor 的内存中没有状态,而是将其直接写入某个用于查询状态的数据库。这基本上就是 CQRS。但我不明白的是为什么基本上所有关于 akka 持久性的教程和示例都没有任何相关信息。就像他们在教程中构建一些客户/订单系统一样,但他们永远不会从内存中删除它们。这很令人困惑。 (2认同)