如何通过ZIO环境在ZIO任务之间共享ZIO队列

jq1*_*727 5 scala zio

我对Scala和ZIO有点陌生,遇到了一个奇怪的难题。

我想设置一个包含ZIO队列的ZIO环境,以后offertake从此共享队列中使用不同的ZIO任务。

我试图这样定义我的环境

    trait MainEnv extends Console with Clock
    {
      val mainQueue = Queue.unbounded[String]
    }
Run Code Online (Sandbox Code Playgroud)

并从这样的单独任务访问队列

    for {
      env      <- ZIO.environment[MainEnv]
      queue    <- env.mainQueue
      ...
Run Code Online (Sandbox Code Playgroud)

但是在测试中,我观察到每个单独的任务都有一个单独的Queue实例。
寻找签名unbounded

  def unbounded[A]: UIO[Queue[A]]
Run Code Online (Sandbox Code Playgroud)

我观察到它并不会立即返回一个队列,而是返回一个返回队列的效果,因此虽然观察到的行为是有道理的,但这根本不是我所希望的,并且我没有看到一种清晰的方式来获得行为我想要。

我们将不胜感激关于如何实现我的目标的任何建议,这些目标是通过环境中存储的共享队列来设置不同的任务进行通信。


供参考的是我的代码和输出。

样本执行

bash-3.2$ sbt run
[info] Loading project definition from /private/tmp/env-q/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to example (in build file:/private/tmp/env-q/)
[info] Compiling 1 Scala source to /private/tmp/env-q/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /private/tmp/env-q/target/scala-2.12/example_2.12-0.0.1-SNAPSHOT.jar ...
[info] Done packaging.
[info] Running example.Main 
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@65b9a444
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@7c050764
Run Code Online (Sandbox Code Playgroud)

(挂在这里-注意环境对象是相同的,但队列对象是不同的,因此第二个任务被卡住了)

/tmp/env-q/test.scala

这是我完整的测试,它基于https://www.slideshare.net/jdegoes/zio-queue的幻灯片37中的示例

  def unbounded[A]: UIO[Queue[A]]
Run Code Online (Sandbox Code Playgroud)

/tmp/env-q/build.sbt

这是我使用的build.sbt

val ZioVersion = "1.0.0-RC13"

lazy val root = (project in file("."))
  .settings(
    organization := "example",
    name := "example",
    version := "0.0.1-SNAPSHOT",
    scalaVersion := "2.12.8",
    scalacOptions ++= Seq("-Ypartial-unification"),
    libraryDependencies ++= Seq(
      "dev.zio"                 %% "zio"                 % ZioVersion,
    ),
    addCompilerPlugin("org.spire-math" %% "kind-projector"     % "0.9.6"),
    addCompilerPlugin("com.olegpy"     %% "better-monadic-for" % "0.2.4")
  )

scalacOptions ++= Seq(
  "-deprecation",               // Emit warning and location for usages of deprecated APIs.
  "-encoding", "UTF-8",         // Specify character encoding used by source files.
  "-language:higherKinds",      // Allow higher-kinded types
  "-language:postfixOps",       // Allows operator syntax in postfix position (deprecated since Scala 2.10)
  "-feature",                   // Emit warning and location for usages of features that should be imported explicitly.
  "-Ypartial-unification",      // Enable partial unification in type constructor inference
  "-Xfatal-warnings",           // Fail the compilation if there are any warnings
)
Run Code Online (Sandbox Code Playgroud)

jq1*_*727 5

在ZIO Core 的官方Gitter频道中,Adam Fraser建议

您可能想让您的环境只有一个Queue[String],然后您想要使用一种类似provideMwith 的方法Queue.unbounded来创建一个队列并将其提供给您的整个应用程序。那是与之provideM相对的地方provide。它A通过提供来满足需要的环境ZIO[A]

稍微深入研究ZIO源代码,可以看到DefaultTestReporterSpec.scala中的一个有用示例。

通过将环境定义为

  trait MainEnv extends Console with Clock    // environment with queue
  {
    val mainQueue: Queue[String]
  }
Run Code Online (Sandbox Code Playgroud)

改变我的任务,以获得env.mainQueue=不是<-(因为mainQueue是Queue[String]现在,而不是一个UIO[Queue[String]],删除runEnv和更改run方法在我的测试中使用provideSomeM

  def run(args: List[String]) =
    program.provideSomeM(
      for {
        q <- Queue.unbounded[String]
      } yield new MainEnv with Console.Live with Clock.Live {
        override val mainQueue = q
      }
    ).fold(_ => 1, _ => 0)
Run Code Online (Sandbox Code Playgroud)

我能够得到预期的结果:

sbt:example> run
[info] Running example.Main 
env example.Main$$anon$1@45bfc0da queue zio.Queue$$anon$1@13b73d56
env example.Main$$anon$1@45bfc0da queue zio.Queue$$anon$1@13b73d56
Give me Coffee!
[success] Total time: 1 s, completed Oct 1, 2019 7:41:47 AM
Run Code Online (Sandbox Code Playgroud)