小编Lev*_*sov的帖子

flatMap、flatTap、evalMap 和 evalTap 的区别

在函数流的 Scala fs2 库中:

我想明白之间的差别flatMapflatTapevalMapevalTap。它们似乎都执行相同的操作,即流值的转换。

有什么区别以及何时应该使用它们中的每一个?

functional-programming scala scala-cats fs2

15
推荐指数
1
解决办法
2477
查看次数

如何在 Doobie (Postgres) 中读/写时间戳

如何在 Doobie 中读/写时间戳?

我有一个包含时间戳字段的记录类。当我尝试将其写入数据库或使用 doobie 读取时,出现错误Cannot find or construct a Read instance for type

case class ExampleRecord(data: String, created_at: Timestamp)

val create = sql"create table if not exists example_ts (data TEXT NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)".update.run
val insert = Update[ExampleRecord]("insert into example_ts (data, created_at) values (?, ?)")
  .updateMany(List(
    ExampleRecord("one", Timestamp.valueOf(LocalDateTime.now())),
    ExampleRecord("two", Timestamp.valueOf(LocalDateTime.now()))
  ))
val select = sql"select data, created_at from example_ts".query[ExampleRecord].stream

val app = for {
  _ <- create.transact(xa).compile.drain
  _ <- insert.transact(xa).compile.drain
  _ <- …
Run Code Online (Sandbox Code Playgroud)

postgresql scala doobie

10
推荐指数
1
解决办法
2444
查看次数

如何解释Scala Cats / fs2中的堆栈安全性?

这是fs2文档中的一段代码。该函数go是递归的。问题是我们如何知道它是否是堆栈安全的,以及如何推断任何函数是否是堆栈安全的?

import fs2._
// import fs2._

def tk[F[_],O](n: Long): Pipe[F,O,O] = {
  def go(s: Stream[F,O], n: Long): Pull[F,O,Unit] = {
    s.pull.uncons.flatMap {
      case Some((hd,tl)) =>
        hd.size match {
          case m if m <= n => Pull.output(hd) >> go(tl, n - m)
          case m => Pull.output(hd.take(n.toInt)) >> Pull.done
        }
      case None => Pull.done
    }
  }
  in => go(in,n).stream
}
// tk: [F[_], O](n: Long)fs2.Pipe[F,O,O]

Stream(1,2,3,4).through(tk(2)).toList
// res33: List[Int] = List(1, 2)
Run Code Online (Sandbox Code Playgroud)

如果我们go从另一个方法调用,它也是安全的吗?

def tk[F[_],O](n: …
Run Code Online (Sandbox Code Playgroud)

functional-programming scala tail-recursion scala-cats fs2

7
推荐指数
1
解决办法
464
查看次数

如何使用 fs2-kafka 读取嵌入式 kafka

我正在使用fs2-kafka来读取嵌入的 kafka

我使用创建嵌入式 kafka withRunningKafkaOnFoundPort,创建主题并发布一些消息。然而,当我尝试用 fs2-kafka 读回它时,我得到一个 NullPointerException。我已经隔离了一个测试用例,代码如下。

这是我的代码:

import cats.effect._
import cats.implicits._
import cats.effect.implicits._
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, KafkaConsumer, consumerStream}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import scala.concurrent.ExecutionContext

class KafkaSuite extends FunSuite with EmbeddedKafka {

  val singleThreadExecutor = ExecutionContext.fromExecutor((task: Runnable) => task.run())
  implicit val contextShift = IO.contextShift(singleThreadExecutor)
  implicit val timer = IO.timer(singleThreadExecutor)

  val topic = "example"
  val partition = 0
  val clientId = "client"

  test("works") {
    val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka fs2 embedded-kafka cats-effect

3
推荐指数
1
解决办法
1286
查看次数