标签: cats-effect

IO Monad 与 Reader Monad

我想知道 IO Monad 和 Reader monad 之间有什么关系吗?

Reader monad 中的环境可以产生效果吗?如果是这样,ZIO 或 Cats IO、scalaz-effects 如何处理 Reader Monad 原则?

scalaz scala-cats cats-effect zio

1
推荐指数
1
解决办法
699
查看次数

Cats Effect 的 IO.suspend 函数到底有什么作用?

猫效应有什么作用IO.suspend以及为什么有用?有文档,但并不完全清楚。

文档给出了以下用例:

import cats.effect.IO

def fib(n: Int, a: Long, b: Long): IO[Long] =
  IO.suspend {
    if (n > 0)
      fib(n - 1, b, a + b)
    else
      IO.pure(a)
  }
Run Code Online (Sandbox Code Playgroud)

举个例子,为什么我要使用上面的函数,而不是下面的类似函数?

import cats.effect.IO

import scala.annotation.tailrec

@tailrec
def fib(n: Int, a: Long, b: Long): IO[Long] =
  if (n > 0)
    fib(n -1, b, a + b)
  else
    IO.pure(a)

Run Code Online (Sandbox Code Playgroud)

functional-programming scala scala-cats cats-effect

1
推荐指数
1
解决办法
742
查看次数

使用scala-cats IO类型封装可变Java库

我知道,一般来说,关于决定要建模什么效果,有很多话要说。这个讨论是在 Scala 函数式编程中关于 IO 的章节中介绍的。

\n

尽管如此,我还没有读完这一章,我只是从头到尾地浏览了一遍,然后才和 Cats IO 一起阅读。

\n

与此同时,我在工作中遇到了一些需要很快交付的代码的情况。\n它依赖于一个与突变有关的 Java 库。该库是很久以前开始的,由于遗留原因,我没有看到它们发生变化。

\n

无论如何,长话短说。实际上,将任何变异函数建模为 IO 是封装变异 Java 库的可行方法吗?

\n

Edit1(根据要求我添加一个片段)

\n

准备好建立一个模型,改变模型而不是创建一个新模型。例如,我会将 jena 与 gremlin 进行对比,gremlin 是一个基于图形数据的功能库。

\n
def loadModel(paths: String*): Model =\n    paths.foldLeft(ModelFactory.createOntologyModel(new OntModelSpec(OntModelSpec.OWL_MEM)).asInstanceOf[Model]) {\n      case (model, path) \xe2\x87\x92\n        val input = getClass.getClassLoader.getResourceAsStream(path)\n        val lang  = RDFLanguages.filenameToLang(path).getName\n        model.read(input, "", lang)\n    }\n
Run Code Online (Sandbox Code Playgroud)\n

那是我的 scala 代码,但是网站中记录的 java api 看起来像这样。

\n
// create the resource\nResource r = model.createResource();\n\n// add the property\nr.addProperty(RDFS.label, model.createLiteral("chat", "en"))\n .addProperty(RDFS.label, model.createLiteral("chat", "fr"))\n .addProperty(RDFS.label, model.createLiteral("<em>chat</em>", true));\n\n// …
Run Code Online (Sandbox Code Playgroud)

scala scala-cats cats-effect

1
推荐指数
1
解决办法
665
查看次数

fs2 - 与 2 个流共享 Ref

Ref[F, A]我正在尝试在两个并发流之间共享。下面是实际场景的简化示例。

  class Container[F[_]](implicit F: Sync[F]) {
    private val counter = Ref[F].of(0)

    def incrementBy2 = counter.flatMap(c => c.update(i => i + 2))

    def printCounter = counter.flatMap(c => c.get.flatMap(i => F.delay(println(i))))
  }
Run Code Online (Sandbox Code Playgroud)

在主函数中,

object MyApp extends IOApp {

  def run(args: List[String]): IO[ExitCode] = {
    val s = for {
      container <- Ref[IO].of(new Container[IO]())
    } yield {
      val incrementBy2 = Stream.repeatEval(
          container.get
            .flatTap(c => c.incrementBy2)
            .flatMap(c => container.update(_ => c))
        )
        .metered(2.second)
        .interruptScope

      val printStream = Stream.repeatEval(
          container.get
            .flatMap(_.printCounter)
        )
        .metered(1.seconds) …
Run Code Online (Sandbox Code Playgroud)

scala scala-cats fs2 cats-effect

1
推荐指数
1
解决办法
614
查看次数

尝试模拟 cats.effect.IO 时出现 Mockito 错误

我试图cats.effect.IO嘲笑

val ioSql: IO[Sql[IO, SqlConnection[IO]]] = mock[IO[Sql[IO, SqlConnection[IO]]]]
Run Code Online (Sandbox Code Playgroud)

SqlSqlConnection我公司的图书馆。

我有这个错误

Underlying exception : java.lang.reflect.MalformedParameterizedTypeException
------------------------------------------------------------
    org.mockito.exceptions.base.MockitoException: 
    Mockito cannot mock this class: class cats.effect.IO.
    
    Mockito can only mock non-private & non-final classes.
    If you're not sure why you're getting this error, please report to the mailing list.
    
    
    Java               : 1.8
    JVM vendor name    : Azul Systems, Inc.
    JVM vendor version : 25.192-b01
    JVM name           : OpenJDK 64-Bit Server VM
    JVM version        : 1.8.0_192-b01
    JVM info           : …
Run Code Online (Sandbox Code Playgroud)

scala mockito cats-effect

0
推荐指数
1
解决办法
547
查看次数

fs2.Stream 挂起两次

问题:

我想反复从fs2.Stream某些第三方库提供的一些批次中取出一些批次,因此将客户从fs2.Stream自身中抽象出来并简单地给它们F[List[Int]]在它们准备好后立即批处理。

尝试: 我尝试使用fs2.Stream::take并运行一些示例。

一世。

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val r = for {
  queue <- fs2.concurrent.Queue.unbounded[IO, Int]
  stream = queue.dequeue
  _ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
  _ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()

r.unsafeRunSync()
Run Code Online (Sandbox Code Playgroud)

它打印第一批List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),然后挂起。我预计从0到的所有批次都1000将被打印。

在这里让事情更简单一点是

二、

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val r = for {
  queue <- fs2.concurrent.Queue.unbounded[IO, Int]
  stream = …
Run Code Online (Sandbox Code Playgroud)

functional-programming scala scala-cats fs2 cats-effect

0
推荐指数
1
解决办法
31
查看次数

使用基于猫效应的工作人员时如何避免竞争状况?

我创建了一个微型工作系统来以最大的多核处理器利用率运行并行作业。它似乎工作正常,但在某些时候,当处理大量作业时,会出现错误(没有错误消息,只是挂起),我怀疑这是低级竞争条件。我无法确定这是否是我用来实现并行性的 cats-effect 的错误,还是 Atomic 或 TrieMap 的错误。

这是一个缩小的实现,可用于说明和测试该问题:

import cats.effect.IO
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.collection.concurrent.TrieMap
import cats.effect.unsafe.implicits.global
import java.util.concurrent.ConcurrentHashMap


object ThreadingError extends App:
  val jobIdsAdded = (0L until 10000L).toList
  for (_ <- jobIdsAdded.iterator) {
    ParallelJobs.addJob(() => {})
  }
  while(ParallelJobs.count.get() < 10000L) {
    print(s"${ParallelJobs.count.get()}\r")
    Thread.sleep(200)
  }

object ParallelJobs:
  private val allCores = Runtime.getRuntime.availableProcessors()
  private val availableCores = allCores - 1
  private val assignedTillJobId: AtomicLong = AtomicLong(0L)
  val jobsTrieMap: TrieMap[Long, () => Any] = TrieMap.empty[Long, () => Any]
  val jobsConcurrentHashMap: ConcurrentHashMap[Long, () …
Run Code Online (Sandbox Code Playgroud)

scala scala-cats cats-effect scala-3

0
推荐指数
1
解决办法
318
查看次数

猫效应 3 互斥意外行为

我偶然发现了一个意想不到的cats.effect.std.Mutex行为(它不适用于某些情况)。看来我错过了一些核心理解,Async但还没有找到根本原因。

\n

进口:

\n
import cats.{Applicative, FlatMap}\nimport cats.effect.std.{Console, Mutex}\nimport cats.effect.*\nimport cats.implicits.*\nimport cats.syntax.all.*\n\nimport scala.concurrent.duration.*\n
Run Code Online (Sandbox Code Playgroud)\n

假设我们有一个服务

\n

工作场景

\n
  class Service[F[_] : Async : FlatMap : Console](mutex: Mutex[F]) {\n    def run(name: String): F[Unit] =\n      for\n        _ \xe2\x86\x90 Console[F].println(s"[$name] entered [run] within [${Thread.currentThread().getName}]")\n        _ \xe2\x86\x90 mutex.lock.surround {\n          for {\n            _ \xe2\x86\x90 Console[F].println(s"[$name] entered [locked] within [${Thread.currentThread().getName}]")\n            _ \xe2\x86\x90 Async[F].sleep(2.seconds)\n            _ \xe2\x86\x90 Console[F].println(s"[$name] almost left [locked] within [${Thread.currentThread().getName}]")\n          } yield ()\n        }\n        _ \xe2\x86\x90 Console[F].println(s"[$name] left [run] within [${Thread.currentThread().getName}]")\n …
Run Code Online (Sandbox Code Playgroud)

scala cats-effect

0
推荐指数
1
解决办法
50
查看次数