我使用免费monad为ETL过程实现了一种简单的语言.当List用作数据获取和存储的输入和输出时,一切正常.但是,我正在使用异步库并使用Future[List]
常见的进口和定义
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import cats.free.Free
import cats.free.Free._
sealed trait Ops[A]
type OpsF[A] = Free[Ops, A]
Run Code Online (Sandbox Code Playgroud)
与...合作 List
case class Fetch(offset: Int, amount: Int) extends Ops[List[Record]]
case class Store(recs: List[Record]) extends Ops[List[Response]]
def fetch(offset: Int, amount: Int): OpsF[List[Record]] =
liftF[Ops, List[Record]](Fetch(offset, amount))
def store(recs: List[Record]): OpsF[List[Response]] =
liftF[Ops, List[Response]](Store(recs))
def simpleEtl(offset: Int, amount: Int): Free[Ops, List[Response]] =
fetch(offset, amount).flatMap(r => store(r))
Run Code Online (Sandbox Code Playgroud)
没有合作 Future[List]
case class Fetch(offset: Int, amount: Int) extends Ops[Future[List[Record]]]
case class Store(recs: List[Record]) …Run Code Online (Sandbox Code Playgroud) 我第一次使用猫来解决代码出现的第一天,我想知道是否有可能改进.
给定update具有以下签名
的方法def update(i: Instruction): PosAndDir => PosAndDir
我想出来:
val state: State[PosAndDir, List[Unit]] = instructions.map(i => State.modify(update(i))).toList.sequenceU
val finalState = state.runS(PosAndDir(Pos(0, 0), North)).value
Run Code Online (Sandbox Code Playgroud)
并且
def update2(i: Instruction): State[PosAndDir, Option[Pos]] =
State.modify(update(i)).inspect(pad => if (i == Walk) Some(pad.pos) else None)
…
val state = instructions.map(update2).toList.sequenceU
val positions = state.runA(PosAndDir(Pos(0, 0), North)).value.flatten
Run Code Online (Sandbox Code Playgroud)
更确切地说,问题是:
.value(使用scalaz,它是透明的)?update2用理解来提高可读性?Applicative实例Seq(我知道scalaz中没有).?我一直在尝试使用更抽象的函数式编程概念,例如来自scle的typelevel/cats的概念.
在这个特定的情况下,我试图消除map(_.flatten)遍历一些Futures 后调用的需要.
使用标准库,它看起来像这样:
def stdExample[T](things: Seq[T]): Future[Seq[T]] = {
Future.traverse(things)(futureMaybeThings[T]).map(_.flatten)
}
def futureMaybeThings[T](thing: T): Future[Option[T]] = ???
Run Code Online (Sandbox Code Playgroud)
我尝试使用flatTraversetypelevel/cats,但我能得到的最好的是:
def catsExample[T](things: Seq[T]): Future[Seq[T]] = {
things.toList.flatTraverse(futureMaybeThings[T](_).map(_.toList))
}
Run Code Online (Sandbox Code Playgroud)
things.toList要求获得Traversable需要调用,我可以使用那个.
由于flatTraverse需要f为C => G[F[B]](其中两个C和B是T,G是Future和F是List),futureMaybeThings不匹配而不首先用所述结果map(_.toList).这最终比其他解决方案更糟糕.
它可以创建只上工作的功能Future和TraversableOnce(因为是隐式转换Option[A] => TraversableOnce[A]这样的实现可能是这样的:
def flatTraverse[A, B[_], C, M[X] <: TraversableOnceWithFlatten[X, M]](in: M[A]) …Run Code Online (Sandbox Code Playgroud) 我正在使用catslib.
使用他们的applicative functor实例(或者Cartesian,确切地说)组合两个列表很容易:
import cats._
import cats.implicits._
(List(23, 4), List(55, 56)).mapN(_ + _)
>> List(78, 79, 59, 60)
Run Code Online (Sandbox Code Playgroud)
但是,我似乎无法用两个函数做同样的事情:
val strLength: String => Int = _.length
(strLength, strLength).mapN(_ + _)
>> value mapN is not a member of (String => Int, String => Int)
Run Code Online (Sandbox Code Playgroud)
如果我明确地执行一些隐式转换,并且如果我创建一个类型别名以给编译器一个手,它确实有效:
type F[A] = Function1[String, A]
val doubleStrLength = catsSyntaxTuple2Cartesian[F, Int, Int]((strLength, strLength)).mapN(_ + _)
doubleStrLength("hello")
>> 10
Run Code Online (Sandbox Code Playgroud)
有更简单的方法吗?似乎过于冗长
编辑:如果你想玩它,我在这里创建一个工作表:https://scastie.scala-lang.org/dcastro/QhnD8gwEQEyfnr14g34d9g/2
我想使用Cats EitherT并OptionT处理该类型Future[Either[Error, Option[T]].假设有以下方法:
def findTeacher(id: Int): Future[Either[String, Option[Teacher]]]
def findSchool(teacher: Teacher): Future[Either[String, Option[School]]]
Run Code Online (Sandbox Code Playgroud)
现在,如果我想随后在for-comprehension中调用它们,我可以使用EitherT并OptionT喜欢这样:
def getSchoolByTeacherId(id: Int): Future[Either[String, Option[School]]] = {
val result = for {
maybeTeacher <- EitherT(findTeacher(id))
schoolF = maybeTeacher.map(findSchool).getOrElse(Future.successful(Right(None)))
school <- EitherT(schoolF)
} yield {
school
}
result.value
}
Run Code Online (Sandbox Code Playgroud)
我不知道是否有可能通过合并,使之更简洁,也许OptionT用EitherT?
这是我们想要IO并行执行3的示例
def test: Unit = {
val ioA = IO.shift *> IO(println("Running ioA"))
// ioA: cats.effect.IO[Unit] = <function1>
val ioB = IO.shift *> IO(println("Running ioB"))
// ioB: cats.effect.IO[Unit] = <function1>
val ioC = IO.shift *> IO(println("Running ioC"))
// ioC: cats.effect.IO[Unit] = <function1>
val program: IO[Unit] = (ioA, ioB, ioC).parMapN { (_, _, _) => () }
// program: cats.effect.IO[Unit] = <function1>
program.unsafeRunSync()
}
Run Code Online (Sandbox Code Playgroud)
第一个问题:如果IO.shift在这个例子中使用点怎么办?
第二个问题:如果我们有一个List的IO,我们希望在并行执行?我已经为这个任务创建了一个函数但我不知道这个东西是否已经存在于库中
def parallelize(ios: List[IO[Unit]]): IO[Unit] = {
ios.foldLeft(IO.pure(())) …Run Code Online (Sandbox Code Playgroud) 我试图通过cats.effect.IO例子来理解异步计算并得到一些误解.该unsafe方法unsafeRunAsync似乎异步运行底层效果(我希望ContextShift提供一些).但该方法看起来像这样:
final def unsafeRunAsync(cb: Either[Throwable, A] => Unit): Unit =
IORunLoop.start(this, cb)
Run Code Online (Sandbox Code Playgroud)
既没有ContextShift也没有ExecutionContext提供.这是一个非常简单的例子:
object TestIo extends App {
println(s"Main thread = ${Thread.currentThread().getName}")
val io = IO {
println(s"Effect thread = ${Thread.currentThread().getName}")
Thread.sleep(1000)
}
io.unsafeRunAsync(_ => println(s"Callback thread = ${Thread.currentThread().getName}"))
println(s"Finished")
}
Run Code Online (Sandbox Code Playgroud)
输出是
Main thread = main
Effect thread = main
Callback thread = main
Finished
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,这里的所有内容都是在主线程中 同步运行的.你能解释一下unsafeRunAsync吗?对我而言似乎是一样的unsafeRunSync.
如果我只是使用选项进行理解,一切都按预期进行:
val a = Some(1)
val b = None
val c = Some(3)
val r = for {
aa <- a
bb <- b
cc <- c
} yield aa + bb + cc
println(r) // None, because b is None
Run Code Online (Sandbox Code Playgroud)
但如何使用猫IO实现相同的行为?
import cats.effect.IO
// in reality this will be a methods with side effect
val a = Some(1)
val b = None
val c = Some(3)
val r = for {
_ <- IO{println("a"); a}
_ <- IO{println("b"); b} …Run Code Online (Sandbox Code Playgroud) 如何转换List[Either[String, Int]]为Either[List[String], List[Int]]使用类似于cats sequence的方法?例如, xs.sequence在以下代码中
import cats.implicits._
val xs: List[Either[String, Int]] = List(Left("error1"), Left("error2"))
xs.sequence
Run Code Online (Sandbox Code Playgroud)
返回,Left(error1)而不是required Left(List(error1, error2))。
凯文·赖特的答案表明
val lefts = xs collect {case Left(x) => x }
def rights = xs collect {case Right(x) => x}
if(lefts.isEmpty) Right(rights) else Left(lefts)
Run Code Online (Sandbox Code Playgroud)
哪个返回Left(List(error1, error2)),但是猫是否提供开箱即用的排序方式来收集所有剩余数据?
这是fs2文档中的一段代码。该函数go是递归的。问题是我们如何知道它是否是堆栈安全的,以及如何推断任何函数是否是堆栈安全的?
import fs2._
// import fs2._
def tk[F[_],O](n: Long): Pipe[F,O,O] = {
def go(s: Stream[F,O], n: Long): Pull[F,O,Unit] = {
s.pull.uncons.flatMap {
case Some((hd,tl)) =>
hd.size match {
case m if m <= n => Pull.output(hd) >> go(tl, n - m)
case m => Pull.output(hd.take(n.toInt)) >> Pull.done
}
case None => Pull.done
}
}
in => go(in,n).stream
}
// tk: [F[_], O](n: Long)fs2.Pipe[F,O,O]
Stream(1,2,3,4).through(tk(2)).toList
// res33: List[Int] = List(1, 2)
Run Code Online (Sandbox Code Playgroud)
如果我们go从另一个方法调用,它也是安全的吗?
def tk[F[_],O](n: …Run Code Online (Sandbox Code Playgroud)