我尝试使用以下代码http4s v0.19.0:
import cats.effect._
def usingHttp4s(uri: String, bearerToken: String)(implicit cs: ContextShift[IO]): String = {
import scala.concurrent.ExecutionContext
import org.http4s.client.dsl.io._
import org.http4s.headers._
import org.http4s.Method._
import org.http4s._
import org.http4s.client._
import org.http4s.client.middleware._
val blockingEC = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(5))
val middlewares = Seq(
RequestLogger[IO](logHeaders = true, logBody = true, redactHeadersWhen = _ => false)(_),
FollowRedirect[IO](maxRedirects = 5)(_)
)
val client = middlewares.foldRight(JavaNetClientBuilder(blockingEC).create[IO])(_.apply(_))
val req = GET(
Uri.unsafeFromString(uri),
Authorization(Credentials.Token(AuthScheme.Bearer, bearerToken))
)
client.expect[String](req).unsafeRunSync()
}
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
[error] (run-main-0) org.http4s.client.UnexpectedStatus: unexpected HTTP status: 401 Unauthorized
[error] org.http4s.client.UnexpectedStatus: unexpected HTTP …Run Code Online (Sandbox Code Playgroud) 我正在尝试在使用 Doobie 将用户插入数据库的同一事务中发送电子邮件。
我知道我可以举IO到ConnectionIO通过使用Async[ConnectionIO].liftIO(catsIO)其中catsIO: IO[String]
但在我的代码,我不上工作IO,我使用F与约束,例如F[_]: Async
于是我可以代替F我自己的单子进行测试。
是否可以在不直接使用类型的情况下以某种方式提升F[String]进入? ConnectionIO[String]IO
这是我为 IO 类型找到的答案:Doobie and DB access composition within 1 transaction
我刚刚开始使用 scala,想要建立到我的数据库的连接。
(我的知识来源于https://www.scala-exercises.org/上的 scala/doobie 教程)
现在这是代码:
import doobie._
import doobie.implicits._
import cats.effect._
import cats.implicits._
import doobie.hikari._
...
val transactor: Resource[IO, HikariTransactor[IO]] =
for {
ce <- ExecutionContexts.fixedThreadPool[IO](32) // our connect EC
be <- Blocker[IO] // our blocking EC
xa <- HikariTransactor.newHikariTransactor[IO](
"org.h2.Driver", // driver classname
"jdbc:mysql://localhost:3306/libraries", // connect URL
"root", // username
"", // password
ce, // await connection here
be // execute JDBC operations here
)
} yield xa
Run Code Online (Sandbox Code Playgroud)
当我尝试构建我的代码时,我收到以下错误消息:
错误:(25, 53) 无法找到 ContextShift[cats.effect.IO] 的隐式值:
从效果库导入 ContextShift[cats.effect.IO]
如果使用 …
如果我想在诸如 http4s 之类的东西中使用 ReactiveMongo,我必须将 ReactiveMongo 返回的所有 Future 调用包装在 Cats IO 效果中,这样说是否正确?
概括地说,将 ReactiveMongo 合并到 http4s 中需要哪些步骤?
我刚刚开始我的 fs2 流冒险。我想要实现的是读取一个文件(一个大文件,这就是我使用 fs2 的原因),转换它并将结果写入两个不同的文件(基于某些谓词)。一些代码(来自https://github.com/typelevel/fs2),以及我的评论:
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
/* instead of the last line I want something like this:
.through(<write temperatures higher than 10 to one file, the rest to the other one>)
*/
}
Run Code Online (Sandbox Code Playgroud)
最有效的方法是什么?显而易见的解决方案是使用两个具有不同过滤器的流,但效率很低(将有两次通过)。
假设我有一组规则,其中有一个IO[Boolean]在运行时返回的验证函数。
case class Rule1() {
def validate(): IO[Boolean] = IO.pure(false)
}
case class Rule2() {
def validate(): IO[Boolean] = IO.pure(false)
}
case class Rule3() {
def validate(): IO[Boolean] = IO.pure(true)
}
val rules = List(Rule1(), Rule2(), Rule3())
Run Code Online (Sandbox Code Playgroud)
现在我必须迭代这些规则并查看“这些规则中的任何一个是否有效”,如果不有效则抛出异常!
for {
i <- rules.map(_.validate()).sequence
_ <- if (i.contains(true)) IO.unit else IO.raiseError(new RuntimeException("Failed"))
} yield ()
Run Code Online (Sandbox Code Playgroud)
上面代码片段的问题在于它试图评估所有规则!我真正想要的是在第一次验证时退出true。
不知道如何在 Scala 中使用猫效果来实现这一点。
这是一个玩具 Scala 程序,它从控制台读取 10 个数字并将每个数字加 1 打印出来:
import cats._
import cats.effect._
import cats.implicits._
object Main extends IOApp {
def printLine[A](x: A)(implicit show: Show[A]): IO[Unit] =
IO(println(show.show(x)))
def readLine(): IO[String] =
IO(scala.io.StdIn.readLine())
def readInts(n: Int): IO[List[Int]] =
List.fill(n)(readLine().map(_.toInt)).sequence
override def run(args: List[String]): IO[ExitCode] = {
for {
list <- readInts(10)
i <- list
_ <- printLine(i + 1)
} yield ExitCode.Success
}
}
Run Code Online (Sandbox Code Playgroud)
上面的代码不能编译。我得到:
[error] found : cats.effect.IO[cats.effect.ExitCode]
[error] required: scala.collection.GenTraversableOnce[?]
[error] _ <- printLine(i + 1)
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?
这或许曾多次被问过,但我找到的建议都没有帮助.
我有一个简单的Scala代码,生成长数取决于一些副作用.我在IO monad中包装东西,但根据最小功率原则,我实际上是在声明我的功能F[_]: Effect.现在代码不会编译,我不明白为什么,请建议可能出错的地方
import cats.effect.{Clock, Effect}
import cats.syntax.all._
import java.util.concurrent.TimeUnit
...
def generateId[F[_]: Effect](rid: Long)(implicit F: Effect[F], clock: Clock[F]): F[Long] =
for {
currentTimeNanos <- clock.realTime(TimeUnit.NANOSECONDS)
tid <- F.delay(Thread.currentThread().getId)
} yield
(tid << 40 /* */ & 0xFFFFFF0000000000L) |
(rid << 16 /* */ & 0x000000FFFFFF0000L) |
(currentTimeNanos & 0x000000000000FFFFL)
Run Code Online (Sandbox Code Playgroud)
[error] /.../package.scala:34:41: value flatMap is not a member of type parameter F[Long]
[error] currentTimeNanos <- clock.realTime(TimeUnit.NANOSECONDS)
[error] ^
[error] /.../package.scala:35:34: value map is not a member of …Run Code Online (Sandbox Code Playgroud) 使用 tagless final(不使用 IO,而是使用通用 F)我如何抽象出这样的东西:
def doSomething(s: String): IO[Unit] = ???
List("authMethods", "secretEngines", "plugins", "CAs", "common").parTraverse(doSomething)
Run Code Online (Sandbox Code Playgroud)
我能得到的最接近的是parTraverseN从 Concurrent 对象中使用,但我认为这将并发运行而不是并行运行(如在parallelism 中)。这也迫使我选择一个nwhere as parTraversenot。
列表的大小只是一个例子,它可能更大。doSomething是一个纯函数,它的多次执行可以毫无问题地并行运行。
理想情况下,鉴于doSomething返回,IO[Unit]我想抽象parTraverse_为F具有正确类型类实例的 。
我正在尝试使用 SignalRef 中断 fs2 流。我使用以下命令设置并运行流。流应在switch包含时运行,并在包含false时中断switchtrue
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
val program: Stream[IO, Unit] =
Stream
.repeatEval(IO{
println(java.time.LocalTime.now)
println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
})
.metered(1.second)
program
.interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
}
program.compile.drain.unsafeRunAsync(() => _)
Run Code Online (Sandbox Code Playgroud)
然后我尝试中断流
switch.map(_.set(true).unsafeRunSync)
Run Code Online (Sandbox Code Playgroud)
然而,这股潮流仍在继续。在标准输出中我看到
15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false
Run Code Online (Sandbox Code Playgroud)
那么显然它没有切换到 true 吗?
cats-effect ×10
scala ×10
scala-cats ×6
fs2 ×2
curl ×1
doobie ×1
exists ×1
findfirst ×1
http4s ×1
http4s-circe ×1