标签: fs2

flatMap、flatTap、evalMap 和 evalTap 的区别

在函数流的 Scala fs2 库中:

我想明白之间的差别flatMapflatTapevalMapevalTap。它们似乎都执行相同的操作,即流值的转换。

有什么区别以及何时应该使用它们中的每一个?

functional-programming scala scala-cats fs2

15
推荐指数
1
解决办法
2477
查看次数

如何在 fs2 中“拆分”流?

我想做这样的事情:

def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) = 
  (stream, stream.map(split)
Run Code Online (Sandbox Code Playgroud)

但这不起作用,因为它从源头“拉”了两次 - 当我同时排出streamstream.map(split). 我如何防止这种情况?通过维护我自己的内部缓冲区以某种方式切换到基于“推送”的模型,这样我就不会拉两次?

scala fs2

9
推荐指数
1
解决办法
1146
查看次数

将元素从外部推送到 fs2 中的反应流

我有一个外部(即,我无法更改它)Java API,如下所示:

public interface Sender {
    void send(Event e);
}
Run Code Online (Sandbox Code Playgroud)

我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些收集到一个包中并通过 HTTP 发送到某个端点的。这一切都应该异步完成,没有send()阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。

使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将它具体化并使用具体化ActorRef将新事件推送到流:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def …
Run Code Online (Sandbox Code Playgroud)

scala reactive-streams akka-stream fs2

8
推荐指数
1
解决办法
953
查看次数

如何解释Scala Cats / fs2中的堆栈安全性?

这是fs2文档中的一段代码。该函数go是递归的。问题是我们如何知道它是否是堆栈安全的,以及如何推断任何函数是否是堆栈安全的?

import fs2._
// import fs2._

def tk[F[_],O](n: Long): Pipe[F,O,O] = {
  def go(s: Stream[F,O], n: Long): Pull[F,O,Unit] = {
    s.pull.uncons.flatMap {
      case Some((hd,tl)) =>
        hd.size match {
          case m if m <= n => Pull.output(hd) >> go(tl, n - m)
          case m => Pull.output(hd.take(n.toInt)) >> Pull.done
        }
      case None => Pull.done
    }
  }
  in => go(in,n).stream
}
// tk: [F[_], O](n: Long)fs2.Pipe[F,O,O]

Stream(1,2,3,4).through(tk(2)).toList
// res33: List[Int] = List(1, 2)
Run Code Online (Sandbox Code Playgroud)

如果我们go从另一个方法调用,它也是安全的吗?

def tk[F[_],O](n: …
Run Code Online (Sandbox Code Playgroud)

functional-programming scala tail-recursion scala-cats fs2

7
推荐指数
1
解决办法
464
查看次数

如何将scala fs2流转换为字符串?

我想知道如何将 Scala fs2 Stream 转换为字符串,来自 fs2 github 自述文件示例:

def converter[F[_]](implicit F: Sync[F]): F[Unit] = {
  val path = "/Users/lorancechen/version_control_project/_unlimited-works/git-server/src/test/resources"

  io.file.readAll[F](Paths.get(s"$path/fs.txt"), 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .through(text.utf8Encode)
    .through(io.file.writeAll(Paths.get(s"$path/fs-output.txt")))
    .compile.drain

}

// at the end of the universe...
val u: Unit = converter[IO].unsafeRunSync()
Run Code Online (Sandbox Code Playgroud)

如何将结果获取到字符串而不是另一个文件?

scala fs2

6
推荐指数
2
解决办法
2719
查看次数

如何在 FS2 中使用分类器函数对对象进行分组?

我有一个 unordered 流measurements,我想将其分组为固定大小的批次,以便以后可以有效地保留它们:

val measurements = for {
  id <- Seq("foo", "bar", "baz")
  value <- 1 to 5
} yield (id, value)

fs2.Stream.emits(scala.util.Random.shuffle(measurements)).toVector
Run Code Online (Sandbox Code Playgroud)

也就是说,而不是:

(bar,4)
(foo,5)
(baz,3)
(baz,5)
(baz,4)
(foo,2)
(bar,2)
(foo,4)
(baz,1)
(foo,1)
(foo,3)
(bar,1)
(bar,5)
(bar,3)
(baz,2)
Run Code Online (Sandbox Code Playgroud)

对于等于的批量大小,我希望具有以下结构3

(bar,[4,2,1])
(foo,[5,2,4])
(baz,[3,5,4])
(baz,[1,2])
(foo,[1,3])
(bar,[5,3])
Run Code Online (Sandbox Code Playgroud)

在 FS2 中是否有一种简单、惯用的方法来实现这一点?我知道有一个groupAdjacentBy函数,但这只会考虑相邻的项目。

我现在在0.10.5

functional-programming scala stream fs2

6
推荐指数
1
解决办法
650
查看次数

使用fs2.Stream分组事件

我的事件流如下:

sealed trait Event

val eventStream: fs2.Stream[IO, Event] = //...
Run Code Online (Sandbox Code Playgroud)

我想将在一分钟内(即每分钟0秒到59秒)收到的事件分组。这听起来很简单fs2

val groupedEventsStream = eventStream groupAdjacentBy {event => 
    TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}
Run Code Online (Sandbox Code Playgroud)

问题在于分组功能不纯。它使用currentTimeMillis。我可以这样工作:

stream.evalMap(t => IO(System.currentTimeMillis(), t))
  .groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))
Run Code Online (Sandbox Code Playgroud)

事实是,我想避免使用元组添加笨拙的样板。还有其他解决方案吗?

或者在这种情况下使用不纯函数还不错吗?

functional-programming scala stream fs2

6
推荐指数
1
解决办法
125
查看次数

FS2 - How to route an element to a specific nested stream/pipe?

I want to run N nested streams/pipes in parallel and send each element to only one of the nested streams. Balance allows me to do this but I want to route elements with the same "key" to the same nested stream or pipe.

I can't see any functions to do this so I wrote a basic POC which broadcasts each element to every stream. The stream/pipe then filters only the elements it should handle (see below). This seems quite inefficient, …

scala scala-cats fs2

6
推荐指数
0
解决办法
437
查看次数

流库中的拉模型与推模型中更容易实现哪些操作(反之亦然)?

Monix的作者说将Monix与FS2进行比较

FS2更好的地方:

  • 生产者和消费者之间的沟通模型是基于拉动的,有时使实施新运营商变得更加容易

Monix更好的地方:

  • 生产者和消费者之间的沟通模型是基于推送的(带有背压),这使其本质上更加有效

很少有问题出现:

  • 在基于拉的模型中,哪些操作更容易实现?
  • 是否存在更难通过这种方式实现的操作?
  • 为什么基于拉的方法本质上比较慢?

scala reactive-streams monix fs2

6
推荐指数
0
解决办法
219
查看次数

提高涉及文件转换的 fs2 流的性能

我有这样的东西(这是来自https://github.com/typelevel/fs2的一个例子,有我的补充,我用评论标记):

import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import java.nio.file.Paths

object Converter extends IOApp {

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap  { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .balanceAvailable // my addition
      .map ( worker => // my addition
        worker // my addition
          .through(text.utf8Decode)
          .through(text.lines)
          .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
          .map(line => fahrenheitToCelsius(line.toDouble).toString)
          .intersperse("\n")
          .through(text.utf8Encode)
          .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
      ) // my addition
      .take(4).parJoinUnbounded // my addition
  } …
Run Code Online (Sandbox Code Playgroud)

scala fs2 cats-effect

6
推荐指数
1
解决办法
392
查看次数