我想知道 IO Monad 和 Reader monad 之间有什么关系吗?
Reader monad 中的环境可以产生效果吗?如果是这样,ZIO 或 Cats IO、scalaz-effects 如何处理 Reader Monad 原则?
猫效应有什么作用IO.suspend以及为什么有用?有文档,但并不完全清楚。
该文档给出了以下用例:
import cats.effect.IO
def fib(n: Int, a: Long, b: Long): IO[Long] =
IO.suspend {
if (n > 0)
fib(n - 1, b, a + b)
else
IO.pure(a)
}
Run Code Online (Sandbox Code Playgroud)
举个例子,为什么我要使用上面的函数,而不是下面的类似函数?
import cats.effect.IO
import scala.annotation.tailrec
@tailrec
def fib(n: Int, a: Long, b: Long): IO[Long] =
if (n > 0)
fib(n -1, b, a + b)
else
IO.pure(a)
Run Code Online (Sandbox Code Playgroud) 我知道,一般来说,关于决定要建模什么效果,有很多话要说。这个讨论是在 Scala 函数式编程中关于 IO 的章节中介绍的。
\n尽管如此,我还没有读完这一章,我只是从头到尾地浏览了一遍,然后才和 Cats IO 一起阅读。
\n与此同时,我在工作中遇到了一些需要很快交付的代码的情况。\n它依赖于一个与突变有关的 Java 库。该库是很久以前开始的,由于遗留原因,我没有看到它们发生变化。
\n无论如何,长话短说。实际上,将任何变异函数建模为 IO 是封装变异 Java 库的可行方法吗?
\n准备好建立一个模型,改变模型而不是创建一个新模型。例如,我会将 jena 与 gremlin 进行对比,gremlin 是一个基于图形数据的功能库。
\ndef loadModel(paths: String*): Model =\n paths.foldLeft(ModelFactory.createOntologyModel(new OntModelSpec(OntModelSpec.OWL_MEM)).asInstanceOf[Model]) {\n case (model, path) \xe2\x87\x92\n val input = getClass.getClassLoader.getResourceAsStream(path)\n val lang = RDFLanguages.filenameToLang(path).getName\n model.read(input, "", lang)\n }\nRun Code Online (Sandbox Code Playgroud)\n那是我的 scala 代码,但是网站中记录的 java api 看起来像这样。
\n// create the resource\nResource r = model.createResource();\n\n// add the property\nr.addProperty(RDFS.label, model.createLiteral("chat", "en"))\n .addProperty(RDFS.label, model.createLiteral("chat", "fr"))\n .addProperty(RDFS.label, model.createLiteral("<em>chat</em>", true));\n\n// …Run Code Online (Sandbox Code Playgroud) Ref[F, A]我正在尝试在两个并发流之间共享。下面是实际场景的简化示例。
class Container[F[_]](implicit F: Sync[F]) {
private val counter = Ref[F].of(0)
def incrementBy2 = counter.flatMap(c => c.update(i => i + 2))
def printCounter = counter.flatMap(c => c.get.flatMap(i => F.delay(println(i))))
}
Run Code Online (Sandbox Code Playgroud)
在主函数中,
object MyApp extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val s = for {
container <- Ref[IO].of(new Container[IO]())
} yield {
val incrementBy2 = Stream.repeatEval(
container.get
.flatTap(c => c.incrementBy2)
.flatMap(c => container.update(_ => c))
)
.metered(2.second)
.interruptScope
val printStream = Stream.repeatEval(
container.get
.flatMap(_.printCounter)
)
.metered(1.seconds) …Run Code Online (Sandbox Code Playgroud) 我试图cats.effect.IO嘲笑
val ioSql: IO[Sql[IO, SqlConnection[IO]]] = mock[IO[Sql[IO, SqlConnection[IO]]]]
Run Code Online (Sandbox Code Playgroud)
Sql是SqlConnection我公司的图书馆。
我有这个错误
Underlying exception : java.lang.reflect.MalformedParameterizedTypeException
------------------------------------------------------------
org.mockito.exceptions.base.MockitoException:
Mockito cannot mock this class: class cats.effect.IO.
Mockito can only mock non-private & non-final classes.
If you're not sure why you're getting this error, please report to the mailing list.
Java : 1.8
JVM vendor name : Azul Systems, Inc.
JVM vendor version : 25.192-b01
JVM name : OpenJDK 64-Bit Server VM
JVM version : 1.8.0_192-b01
JVM info : …Run Code Online (Sandbox Code Playgroud) 问题:
我想反复从fs2.Stream某些第三方库提供的一些批次中取出一些批次,因此将客户从fs2.Stream自身中抽象出来并简单地给它们F[List[Int]]在它们准备好后立即批处理。
尝试:
我尝试使用fs2.Stream::take并运行一些示例。
一世。
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()
r.unsafeRunSync()
Run Code Online (Sandbox Code Playgroud)
它打印第一批List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),然后挂起。我预计从0到的所有批次都1000将被打印。
在这里让事情更简单一点是
二、
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = …Run Code Online (Sandbox Code Playgroud) 我创建了一个微型工作系统来以最大的多核处理器利用率运行并行作业。它似乎工作正常,但在某些时候,当处理大量作业时,会出现错误(没有错误消息,只是挂起),我怀疑这是低级竞争条件。我无法确定这是否是我用来实现并行性的 cats-effect 的错误,还是 Atomic 或 TrieMap 的错误。
这是一个缩小的实现,可用于说明和测试该问题:
import cats.effect.IO
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.collection.concurrent.TrieMap
import cats.effect.unsafe.implicits.global
import java.util.concurrent.ConcurrentHashMap
object ThreadingError extends App:
val jobIdsAdded = (0L until 10000L).toList
for (_ <- jobIdsAdded.iterator) {
ParallelJobs.addJob(() => {})
}
while(ParallelJobs.count.get() < 10000L) {
print(s"${ParallelJobs.count.get()}\r")
Thread.sleep(200)
}
object ParallelJobs:
private val allCores = Runtime.getRuntime.availableProcessors()
private val availableCores = allCores - 1
private val assignedTillJobId: AtomicLong = AtomicLong(0L)
val jobsTrieMap: TrieMap[Long, () => Any] = TrieMap.empty[Long, () => Any]
val jobsConcurrentHashMap: ConcurrentHashMap[Long, () …Run Code Online (Sandbox Code Playgroud) 我偶然发现了一个意想不到的cats.effect.std.Mutex行为(它不适用于某些情况)。看来我错过了一些核心理解,Async但还没有找到根本原因。
进口:
\nimport cats.{Applicative, FlatMap}\nimport cats.effect.std.{Console, Mutex}\nimport cats.effect.*\nimport cats.implicits.*\nimport cats.syntax.all.*\n\nimport scala.concurrent.duration.*\nRun Code Online (Sandbox Code Playgroud)\n假设我们有一个服务
\n工作场景
\n class Service[F[_] : Async : FlatMap : Console](mutex: Mutex[F]) {\n def run(name: String): F[Unit] =\n for\n _ \xe2\x86\x90 Console[F].println(s"[$name] entered [run] within [${Thread.currentThread().getName}]")\n _ \xe2\x86\x90 mutex.lock.surround {\n for {\n _ \xe2\x86\x90 Console[F].println(s"[$name] entered [locked] within [${Thread.currentThread().getName}]")\n _ \xe2\x86\x90 Async[F].sleep(2.seconds)\n _ \xe2\x86\x90 Console[F].println(s"[$name] almost left [locked] within [${Thread.currentThread().getName}]")\n } yield ()\n }\n _ \xe2\x86\x90 Console[F].println(s"[$name] left [run] within [${Thread.currentThread().getName}]")\n …Run Code Online (Sandbox Code Playgroud)