猫效应和异步IO细节

uku*_*ele 11 scala scala-cats cats-effect

几天来,我一直在围绕猫效应和IO.而且我觉得我对这种效果有一些误解,或者我只是错过了它的观点.

  1. 首先 - 如果IO可以取代Scala的Future,我们如何创建异步IO任务?用IO.shift?用IO.async?是IO.delay同步还是异步?我们可以用这样的代码创建一个通用的异步任务Async[F].delay(...)吗?或者当我们用unsafeToAsync或调用IO时发生异步unsafeToFuture
  2. 猫效应中Async和Concurrent的重点是什么?为什么他们分开?
  3. IO是绿线吗?如果是的话,为什么在猫效应中有一个Fiber对象?据我所知,Fiber是绿色线程,但是文档声称我们可以将IO视为绿色线程.

我很感激有些澄清任何一个,因为我没有理解猫效应文档对那些和互联网没有帮助...

Ole*_*cov 24

如果IO可以取代Scala的Future,我们如何创建异步IO任务

首先,我们需要澄清什么是异步任务.通常async意味着"不会阻止操作系统线程",但是因为你提到Future它,它有点模糊.说,如果我写道:

Future { (1 to 1000000).foreach(println) }
Run Code Online (Sandbox Code Playgroud)

它不会是异步的,因为它是一个阻塞循环和阻塞输出,但它可能会在不同的OS线程上执行,由隐式ExecutionContext管理.等效的猫效应代码将是:

for {
  _ <- IO.shift
  _ <- IO.delay { (1 to 1000000).foreach(println) }
} yield ()
Run Code Online (Sandbox Code Playgroud)

(这不是较短的版本)

所以,

  • IO.shift用于改变线程/线程池.Future在每次操作中都会这样做,但它不是免费的性能.
  • IO.delay{...}(又名IO { ... })确实作任何异步和不切换线程.它用于IO从同步副作用API 创建简单值

现在,让我们回到真正的异步.这里要理解的是:

     每个异步计算都可以表示为一个回调函数.

无论您使用的是返回的API Future还是Java CompletableFuture,或类似NIO CompletionHandler,它都可以转换为回调.这是IO.async为了:您可以将任何回调函数转换为IO.如果像:

for {
  _ <- IO.async { ... }
  _ <- IO(println("Done"))
} yield ()
Run Code Online (Sandbox Code Playgroud)

Done将仅在(和如果)...回调中的计算时打印.您可以将其视为阻止绿色线程,但不是OS线程.

所以,

  • IO.async用于将任何已经异步的计算转换为IO.
  • IO.delay用于将任何完全同步的计算转换为IO.
  • 具有真正异步计算的代码就像阻止绿色线程一样.

使用Futures 时最接近的类比是创建scala.concurrent.Promise并返回p.future.


或者当我们使用unsafeToAsync或unsafeToFuture调用IO时发生异步?

有点.随着IO,没有任何情况发生,除非你调用其中的一个(或使用IOApp).但IO并不保证你会在不同的操作系统线程上执行,甚至不能异步执行,除非你用IO.shift或明确要求这样做IO.async.

您可以随时保证线程切换,例如(IO.shift *> myIO).unsafeRunAsyncAndForget().这是可能正是因为myIO不会被执行,直到自找的,你是否有它val myIOdef myIO.

但是,您无法将阻塞操作神奇地转换为非阻塞操作.这是不可能的也没有用Future,也不符合IO.


猫效应中Async和Concurrent的重点是什么?为什么他们分开?

AsyncConcurrent(和Sync)是类型类.它们的设计使程序员可以避免被锁定,cats.effect.IO并且可以为您提供支持您选择的任何API,例如monix Task或Scalaz 8 ZIO,甚至monad变换器类型,例如OptionT[Task, *something*].像fs2,monix和http4s这样的库利用它们为你提供了更多选择.

Concurrent增加额外的东西Async,其中最重要的是.cancelable.start.这些没有直接比喻Future,因为它根本不支持取消.

.cancelable是一个版本.async允许您还指定一些逻辑来取消您正在包装的操作.一个常见的例子是网络请求 - 如果您对结果不再感兴趣,您可以在不等待服务器响应的情况下中止它们,并且不会浪费任何套接字或处理读取响应的时间.你可能永远不会直接使用它,但它有它的位置.

但是如果你不能取消它们,那么可取消的操作有什么用?这里的关键观察是你无法从内部取消操作.其他人必须做出这个决定,这将与操作本身同时发生(这是类型类得名的地方).这就是.start进来的地方.简而言之,

      .start 是一个绿色线程的显式分支.

做就像someIO.startval t = new Thread(someRunnable); t.start(),除了它现在是绿色.而Fiber本质上是一个精简版ThreadAPI:你可以做.join,这就好比Thread#join(),但它不会阻止操作系统线程; 而且.cancel,这是安全版本.interrupt().


请注意,还有其他方法来分叉绿色线程.例如,执行并行操作:

val ids: List[Int] = List.range(1, 1000)
def processId(id: Int): IO[Unit] = ???
val processAll: IO[Unit] = ids.parTraverse_(processId)
Run Code Online (Sandbox Code Playgroud)

将所有ID分叉处理为绿色线程,然后将它们全部加入.或使用.race:

val fetchFromS3: IO[String] = ???
val fetchFromOtherNode: IO[String] = ???

val fetchWhateverIsFaster = IO.race(fetchFromS3, fetchFromOtherNode).map(_.merge)
Run Code Online (Sandbox Code Playgroud)

将并行执行提取,为您提供第一个结果并自动取消较慢的提取.因此,执行.start和使用Fiber并不是分叉更多绿色线程的唯一方法,只是最明确的线程.答案是:

IO是绿线吗?如果是的话,为什么在猫效应中有一个Fiber对象?据我所知,Fiber是绿色线程,但是文档声称我们可以将IO视为绿色线程.

  • IO 就像一个绿色线程,这意味着你可以有很多并行运行而不需要OS线程的开销,并且for-comprehension中的代码就像阻塞计算结果一样.

  • Fiber 是一个用于控制绿色线程显式分叉(等待完成或取消)的工具.

  • @uMdRupert 事物可能看起来不确定地交错,但这种外观可以通过拥有足够的操作系统线程来安排 Future 来提供。不过,操作系统线程的开销并不小,如果只有一个,则不会看到任何交错。我的意思是,future 的主体将在循环开始时声明一个线程,并且在循环完成之前不允许在该线程上完成任何其他工作。与“Future.traverse(1 to 1000)(a =&gt; Future(println(a)))”相比 - 这个线程应该时不时地释放一个线程,然后声明一个新线程以进行更多工作。 (2认同)