use*_*858 6 scala scala-cats doobie
我正在尝试在doobie存储库的代码中隐式添加Async和Sync。Sync和Async [F]可以正常工作IO。我想将它们转换为未来并面临问题
我试图从IO创建自己的Aync
def futureAsync(implicit F: MonadError[Future, Throwable]): Async[Future] = new Async[Future] {
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] = IO.async(k).unsafeToFuture()
override def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] =
throw new Exception("Not implemented Future.asyncF")
override def suspend[A](thunk: => Future[A]): Future[A] = thunk
override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(release: (A, ExitCase[Throwable]) => Future[Unit]): Future[B] =
throw new Exception("Not implemented Future.bracketCase")
override def raiseError[A](e: Throwable): Future[A] = F.raiseError(e)
override def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] = F.handleErrorWith(fa)(_ => f(new Exception("")))
override def pure[A](x: A): Future[A] = F.pure(x)
override def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = F.flatMap(fa)(f)
override def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = F.tailRecM(a)(f)
}
Run Code Online (Sandbox Code Playgroud)
我对asyncF和bracketCase中的两个函数的实现感到震惊,请问有什么可以帮助的吗?
Tra*_*own 13
正如Reactormonk在上面的评论中所说,不可能编写具有正确语义的Asyncfor 实例Future,因为Asyncextends Sync并Sync需要可重复运行的计算的表示形式,而Scala的Future在定义和运行时就开始运行。不能重新运行。
不过,亲自了解这一点很有启发性,我鼓励您尝试编写自己的可编译但(必要时)非法的Async[Future]实例,而无需查看下一个代码块。但是,出于示例的目的,这是我脑海中的一个快速草图:
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import cats.effect.{Async, ExitCase, IO}
def futureAsync(implicit c: ExecutionContext): Async[Future] = new Async[Future] {
def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] =
IO.async(k).unsafeToFuture()
def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] = {
val p = Promise[A]()
val f = k {
case Right(a) => p.success(a)
case Left(e) => p.failure(e)
}
f.flatMap(_ => p.future)
}
def suspend[A](thunk: => Future[A]): Future[A] = Future(thunk).flatten
def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
release: (A, ExitCase[Throwable]) => Future[Unit]
): Future[B] = acquire.flatMap { a =>
use(a).transformWith {
case Success(b) => release(a, ExitCase.Completed).map(_ => b)
case Failure(e) => release(a, ExitCase.Error(e)).flatMap(_ => Future.failed(e))
}
}
def raiseError[A](e: Throwable): Future[A] = Future.failed(e)
def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] =
fa.recoverWith { case t => f(t) }
def pure[A](x: A): Future[A] = Future.successful(x)
def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = f(a).flatMap {
case Right(b) => Future.successful(b)
case Left(a) => tailRecM(a)(f)
}
}
Run Code Online (Sandbox Code Playgroud)
这样可以很好地编译,并且可能在某些情况下可以工作(但请不要实际使用它!)。但是,我们说过它没有正确的语义,我们可以通过使用cats-effect的laws模块来证明这一点。
首先,我们需要一些不需要您担心的东西:
import cats.kernel.Eq, cats.implicits._
import org.scalacheck.Arbitrary
implicit val throwableEq: Eq[Throwable] = Eq.by[Throwable, String](_.toString)
implicit val nonFatalArbitrary: Arbitrary[Throwable] =
Arbitrary(Arbitrary.arbitrary[Exception].map(identity))
implicit def futureEq[A](implicit A: Eq[A], ec: ExecutionContext): Eq[Future[A]] =
new Eq[Future[A]] {
private def liftToEither(f: Future[A]): Future[Either[Throwable, A]] =
f.map(Right(_)).recover { case e => Left(e) }
def eqv(fx: Future[A], fy: Future[A]): Boolean =
scala.concurrent.Await.result(
liftToEither(fx).zip(liftToEither(fy)).map {
case (rx, ry) => rx === ry
},
scala.concurrent.duration.Duration(1, "second")
)
}
Run Code Online (Sandbox Code Playgroud)
然后,我们可以定义一个检查Async实例法律的测试:
import cats.effect.laws.discipline.{AsyncTests, Parameters}
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline
object FutureAsyncSuite extends FunSuite with Discipline {
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val params: Parameters =
Parameters.default.copy(allowNonTerminationLaws = false)
checkAll(
"Async",
AsyncTests[Future](futureAsync).async[String, String, String]
)
}
Run Code Online (Sandbox Code Playgroud)
然后我们可以运行法律测试:
scala> FutureAsyncSuite.execute()
FutureAsyncSuite:
- Async.async.acquire and release of bracket are uncancelable
- Async.async.ap consistent with product + map
- Async.async.applicative homomorphism
...
Run Code Online (Sandbox Code Playgroud)
您会看到大多数测试都是绿色的。这个实例可以解决很多问题。
但是,它确实显示了三个失败的测试,包括以下内容:
- Async.async.repeated sync evaluation not memoized *** FAILED ***
GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
(Discipline.scala:14)
Falsified after 1 successful property evaluations.
Location: (Discipline.scala:14)
Occurred when passed generated values (
arg0 = "???",
arg1 = org.scalacheck.GenArities$$Lambda$7154/1834868832@1624ea25
)
Label of failing property:
Expected: Future(Success(????????))
Received: Future(Success(???))
Run Code Online (Sandbox Code Playgroud)
如果查看法则定义,您会看到这是一个测试,它使用定义了一个Future值,delay然后多次对其进行排序,如下所示:
val change = F.delay { /* observable side effect here */ }
val read = F.delay(cur)
change *> change *> read
Run Code Online (Sandbox Code Playgroud)
其他两个失败是类似的“未记录”违规。这些测试应该看到副作用发生了两次,但是在我们的情况下,不可能编写delay或suspend以Future这种方式进行写(尽管值得尝试,以使自己确信是这种情况)。
综上所述:您可以编写一个Async[Future]实例,使其通过78项Async法律测试中的75项,但是不可能编写通过所有这些测试的实例,并且使用非法实例是一个非常糟糕的主意:两个潜在用户您的代码和库(例如Doobie)将假定您的实例合法,并且如果您不遵守此假设,则可能会打开复杂而烦人的错误之门。
值得注意的是,为此编写一个Future具有合法Async实例的最小包装并不难(例如Rerunnable,我的catbird库中为Twitter的未来提供了一个包装)。cats.effect.IO不过,您确实应该坚持使用,并使用提供的转换在使用传统的Future基于API的代码的任何部分中与期货进行来回转换。