如何在期货中使用Scala ARM?

dan*_*mak 9 scala future resource-management

我想实现ARM(自动资源管理)模式,其中资源是异步使用的.

问题

假设我的资源看起来像:

class MyResource { 
  def foo() : Future[MyResource] = ???
  // Other methods returning various futures
  def close() : Unit = ???
}
object MyResource { 
  def open(name: String): Future[MyResource] = ???
} 
Run Code Online (Sandbox Code Playgroud)

所需的使用模式是:

val r : Future[MyResource] = MyResource.open("name")
r flatMap (r => {
  r.foo() /* map ... */ andThen {
    case _ => r.close()
  }
})
Run Code Online (Sandbox Code Playgroud)

省略的映射函数可能很复杂,涉及分支和链接期货,这些期货会重复调用r返回期货的方法.

我想确保r.close()所有未来的延续完成(或失败)后调用.在每个呼叫站点手动执行此操作非常容易出错.这需要ARM解决方案.

试图解决方案

scala-arm库通常是同步的.这段代码不会做正确的事情,因为在块内的期货完成之前会调用close():

for (r <- managed(MyResource.open("name"))) {
  r map (_.foo()) // map ...
}
Run Code Online (Sandbox Code Playgroud)

我虽然使用这个包装器:

def usingAsync[T](opener: => Future[MyResource]) (body: MyResource => Future[T]) : Future[T] =
  opener flatMap { 
    myr => body(myr) andThen { case _ => myr.close() } }
Run Code Online (Sandbox Code Playgroud)

然后呼叫站点看起来像:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo // map ...
})
Run Code Online (Sandbox Code Playgroud)

但是,块中的代码将负责返回在该块创建的所有其他期货完成时完成的Future.如果它意外没有,那么资源将在所有使用它的期货完成之前关闭.并且没有静态验证来捕获此错误.例如,这将是一个运行时错误:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo() // Do one thing
  myr.bar() // Do another
})
Run Code Online (Sandbox Code Playgroud)

怎么解决这个?

显然,我可以使用scala-arm的分隔连续支持(CPS).它看起来有点复杂,我害怕弄错.它需要启用编译器插件.此外,我的团队对scala非常陌生,我不想要求他们使用CPS.

CPS是前进的唯一途径吗?是否有一个图书馆或设计模式可以更简单地使用Futures,或者使用scala-arm执行此操作的示例?

reg*_*win 2

Reactive Extensions (Rx) could be an alternative solution.\nThere is increasing momentum around this programming paradigm, that is now available in many languages including Scala.

\n\n

Rx 的基础是创建一个 Observable,它是异步事件的来源。Observable 可以以复杂的方式链接起来,这就是它的力量。您订阅 Observable 来监听 onNext、onError 和 onComplete 事件。您还会收到一份订阅退款,允许您取消。

\n\n

我认为您可能会在 onCompleted 和/或 onError 处理程序中添加一个 resources.close() 调用。

\n\n

请参阅 RxScala 文档了解:

\n\n
Observable.subscribe(\n    onNext: (T) \xe2\x87\x92 Unit, \n    onError: (Throwable) \xe2\x87\x92 Unit, \n    onCompleted: () \xe2\x87\x92 Unit): Subscription\n
Run Code Online (Sandbox Code Playgroud)\n\n

更多信息:

\n\n\n