将元素从外部推送到 fs2 中的反应流

Vla*_*eev 8 scala reactive-streams akka-stream fs2

我有一个外部(即,我无法更改它)Java API,如下所示:

public interface Sender {
    void send(Event e);
}
Run Code Online (Sandbox Code Playgroud)

我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些收集到一个包中并通过 HTTP 发送到某个端点的。这一切都应该异步完成,没有send()阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。

使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将它具体化并使用具体化ActorRef将新事件推送到流:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def send(e: Event): Unit = {
  eventsActor ! e
}
Run Code Online (Sandbox Code Playgroud)

CustomBuffer是一个GraphStage与库提供的非常相似的自定义Buffer但根据我们的特定需求量身定制;对于这个特定问题,这可能无关紧要。

如您所见,与来自非流代码的流交互非常简单 - 上的!方法ActorRef特征是异步的,不需要任何额外的机制来调用。然后通过整个反应式管道处理发送给actor的每个事件。而且,由于 akka-http 的实现方式,我什至免费获得了连接池,因此打开到服务器的连接不超过一个。

但是,我找不到正确使用 FS2 执行相同操作的方法。即使放弃缓冲(我可能需要编写一个自定义Pipe实现来完成我们需要的其他事情)和 HTTP 连接池的问题,我仍然坚持一个更基本的事情 - 即,如何将数据推送到“来自外部”的反应流。

我能找到的所有教程和文档都假设整个程序发生在某个效果上下文中,通常是IO. 这不是我的情况 - send()Java 库在未指定的时间调用该方法。因此,我不能将所有内容都保留在一个IO操作中,我必须在方法中完成“推送”操作send(),并将反应流作为一个单独的实体,因为我想聚合事件并希望汇集 HTTP 连接(我相信自然地与反应流相关)。

我假设我需要一些额外的数据结构,比如Queue. fs2 确实有某种fs2.concurrent.Queue,但同样,所有文档都显示了如何在单个IO上下文中使用它,所以我假设做类似的事情

val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()
Run Code Online (Sandbox Code Playgroud)

然后queue在流定义内部使用,然后在send()方法内部单独使用,进一步unsafeRun调用:

val eventPipeline = queue.dequeue
  .through(customBuffer(bufferSize))
  .groupWithin(batchSize, flushDuration)
  .map(toBundle)
  .mapAsyncUnordered(1)(sendRequest)
  .evalTap(response => ...)
  .compile
  .drain

eventPipeline.unsafeRunAsync(...)  // or something

override def send(e: Event) {
  queue.enqueue(e).unsafeRunSync()
}
Run Code Online (Sandbox Code Playgroud)

不是正确的方法,很可能甚至不起作用。

所以,我的问题是,如何正确使用 fs2 来解决我的问题?

sim*_*djo 1

我对该库没有太多经验,但它应该看起来像这样:

import cats.effect.{ExitCode, IO, IOApp}
import fs2.concurrent.Queue

case class Event(id: Int)

class JavaProducer{

  new Thread(new Runnable {
    override def run(): Unit = {
      var id = 0
      while(true){
        Thread.sleep(1000)
        id += 1
        send(Event(id))
      }
    }
  }).start()

  def send(event: Event): Unit ={
    println(s"Original producer prints $event")
  }
}

class HackedProducer(queue: Queue[IO, Event]) extends JavaProducer {
  override def send(event: Event): Unit = {
    println(s"Hacked producer pushes $event")
    queue.enqueue1(event).unsafeRunSync()
    println(s"Hacked producer pushes $event - Pushed")
  }

}

object Test extends IOApp{
  override def run(args: List[String]): IO[ExitCode] = {
    val x: IO[Unit] = for {
      queue <- Queue.unbounded[IO, Event]
      _ = new HackedProducer(queue)
      done <- queue.dequeue.map(ev => {
        println(s"Got $ev")
      }).compile.drain
    } yield done
    x.map(_ => ExitCode.Success)
  }

}
Run Code Online (Sandbox Code Playgroud)