什么是包装阻塞的最佳方法在Scala的Future [T]中尝试[T]?

Bri*_*Hsu 10 scala

这是问题,我有一个库有阻塞方法返回Try [T].但由于它是阻塞的,我想使用Future [T]使其无阻塞.在未来的块中,我还想计算一些依赖于原始阻塞方法的返回值的东西.

但是如果我使用下面的东西,那么我nonBlocking将返回Future [Try [T]],因为Future [T]已经代表Failure [U]已经不那么说服了,我宁愿把异常传播给Future [T]是自我.

def blockMethod(x: Int): Try[Int] = Try { 
  // Some long operation to get an Int from network or IO
  throw new Exception("Network Exception") }
}

def nonBlocking(x: Int): Future[Try[Int]] = future {
  blockMethod(x).map(_ * 2)
}
Run Code Online (Sandbox Code Playgroud)

这是我尝试过的,我只是.getfuture {}块中使用方法,但我不确定这是否是最好的方法.

def blockMethod(x: Int): Try[Int] = Try { 
  // Some long operation to get an Int from network or IO
  throw new Exception("Network Exception") }
}

def nonBlocking(x: Int): Future[Int] = future {
  blockMethod(x).get * 2
}
Run Code Online (Sandbox Code Playgroud)

这是正确的方法吗?或者有一种更加scala惯用的方法来将T Try [T]转换为Future [T]?

Noa*_*oah 14

这是一个不阻塞的示例,请注意您可能希望使用自己的执行上下文而不是scala的全局上下文:

import scala.util._
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object Main extends App {

  def blockMethod(x: Int): Try[Int] = Try {
    // Some long operation to get an Int from network or IO
    Thread.sleep(10000)
    100
  }

  def tryToFuture[A](t: => Try[A]): Future[A] = {
    future {
      t
    }.flatMap {
      case Success(s) => Future.successful(s)
      case Failure(fail) => Future.failed(fail)
    }
  }

  // Initiate long operation
  val f = tryToFuture(blockMethod(1))

  println("Waiting... 10 seconds to complete")

  // Should return before 20 seconds...
  val res = Await.result(f, 20 seconds)

  println(res) // prints 100
}
Run Code Online (Sandbox Code Playgroud)


Yur*_*riy 8

在我看来:Try&Future是monadic构造和惯用的方式是monadic组合(for -reherehension):

你需要为Future [Try [_]](你的库的代码)定义monad变换器:

case class FutureT[R](run : Future[Try[R]])(implicit e: ExecutionContext) {
  def map[B](f : R => B): FutureT[B] = FutureT(run map { _ map f })
  def flatMap[B](f : R => FutureT[B]): FutureT[B] = {
    val p = Promise[Try[B]]()
    run onComplete {
      case Failure(e)           => p failure e
      case Success(Failure(e))  => p failure e
      case Success(Success(v))  => f(v).run onComplete {
        case Failure(e)         => p failure e
        case Success(s)         => p success s
      }
    }
    FutureT(p.future)
  }
}

object FutureT {
  def futureTry[R](run : => Try[R])(implicit e: ExecutionContext) = 
    new FutureT(future { run })

  implicit def toFutureT[R](run : Future[Try[R]]) = FutureT(run)
  implicit def fromFutureT[R](futureT : FutureT[R]) = futureT.run
}  
Run Code Online (Sandbox Code Playgroud)

和用法示例:

def blockMethod(x: Int): Try[Int] = Try {
  Thread.sleep(5000)
  if(x < 10) throw new IllegalArgumentException
  else x + 1
} 

import FutureT._  

// idiomatic way :)
val async = for {
  x <- futureTry { blockMethod(15) }
  y <- futureTry { blockMethod(25) }            
} yield (x + y) * 2  // possible due to using modan transformer  

println("Waiting... 10 seconds to complete")

val res = Await.result(async, 20 seconds)
println(res)

// example with Exception 
val asyncWithError = for {
  x <- futureTry { blockMethod(5) }
  y <- futureTry { blockMethod(25) }            
} yield (x + y) * 2  // possible due to using modan transformer  

// Can't use Await because will get exception 
// when extract value from FutureT(Failure(java.lang.IllegalArgumentException))
// no difference between Failure produced by Future or Try
asyncWithError onComplete {
  case Failure(e) => println(s"Got exception: $e.msg")
  case Success(res) => println(res)
}
// Output:
// Got exception: java.lang.IllegalArgumentException.msg
Run Code Online (Sandbox Code Playgroud)