在函数流的 Scala fs2 库中:
我想明白之间的差别flatMap,flatTap,evalMap和evalTap。它们似乎都执行相同的操作,即流值的转换。
有什么区别以及何时应该使用它们中的每一个?
如何在 Doobie 中读/写时间戳?
我有一个包含时间戳字段的记录类。当我尝试将其写入数据库或使用 doobie 读取时,出现错误Cannot find or construct a Read instance for type。
case class ExampleRecord(data: String, created_at: Timestamp)
val create = sql"create table if not exists example_ts (data TEXT NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)".update.run
val insert = Update[ExampleRecord]("insert into example_ts (data, created_at) values (?, ?)")
.updateMany(List(
ExampleRecord("one", Timestamp.valueOf(LocalDateTime.now())),
ExampleRecord("two", Timestamp.valueOf(LocalDateTime.now()))
))
val select = sql"select data, created_at from example_ts".query[ExampleRecord].stream
val app = for {
_ <- create.transact(xa).compile.drain
_ <- insert.transact(xa).compile.drain
_ <- …Run Code Online (Sandbox Code Playgroud) 这是fs2文档中的一段代码。该函数go是递归的。问题是我们如何知道它是否是堆栈安全的,以及如何推断任何函数是否是堆栈安全的?
import fs2._
// import fs2._
def tk[F[_],O](n: Long): Pipe[F,O,O] = {
def go(s: Stream[F,O], n: Long): Pull[F,O,Unit] = {
s.pull.uncons.flatMap {
case Some((hd,tl)) =>
hd.size match {
case m if m <= n => Pull.output(hd) >> go(tl, n - m)
case m => Pull.output(hd.take(n.toInt)) >> Pull.done
}
case None => Pull.done
}
}
in => go(in,n).stream
}
// tk: [F[_], O](n: Long)fs2.Pipe[F,O,O]
Stream(1,2,3,4).through(tk(2)).toList
// res33: List[Int] = List(1, 2)
Run Code Online (Sandbox Code Playgroud)
如果我们go从另一个方法调用,它也是安全的吗?
def tk[F[_],O](n: …Run Code Online (Sandbox Code Playgroud) 我使用创建嵌入式 kafka withRunningKafkaOnFoundPort,创建主题并发布一些消息。然而,当我尝试用 fs2-kafka 读回它时,我得到一个 NullPointerException。我已经隔离了一个测试用例,代码如下。
这是我的代码:
import cats.effect._
import cats.implicits._
import cats.effect.implicits._
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, KafkaConsumer, consumerStream}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import scala.concurrent.ExecutionContext
class KafkaSuite extends FunSuite with EmbeddedKafka {
val singleThreadExecutor = ExecutionContext.fromExecutor((task: Runnable) => task.run())
implicit val contextShift = IO.contextShift(singleThreadExecutor)
implicit val timer = IO.timer(singleThreadExecutor)
val topic = "example"
val partition = 0
val clientId = "client"
test("works") {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort …Run Code Online (Sandbox Code Playgroud)