标签: akka-stream

在akka-stream中如何从期货集合中创建无序的来源

我需要akka.stream.scaladsl.Source[T, Unit]从一个集合中创建一个Future[T].

例如,有一组期货返回整数,

val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
Run Code Online (Sandbox Code Playgroud)

如何创建一个

val source: Source[Int, Unit] = ???
Run Code Online (Sandbox Code Playgroud)

从中.

我不能使用Future.sequence组合器,从那以后我会等待每个未来完成之后从源头获取任何东西.我想在任何未来完成后立即以任何顺序获得结果.

我知道这Source是一个纯粹的功能API,它不应该以某种方式实现它之前运行任何东西.所以,我的想法是使用Iterator(懒惰)来创建一个源:

Source { () =>
  new Iterator[Future[Int]] {
    override def hasNext: Boolean = ???
    override def next(): Future[Int] = ???
  }
}
Run Code Online (Sandbox Code Playgroud)

但这将是未来的来源,而不是实际价值.我也可以阻止next使用,Await.result(future)但我不确定哪个胎面池的线程将被阻止.这也将顺序调用期货,而我需要并行执行.

更新2:事实证明有一种更简单的方法(感谢Viktor Klang):

Source(futures).mapAsync(1)(identity)
Run Code Online (Sandbox Code Playgroud)

更新:这是基于@sschaef回答我得到的:

def futuresToSource[T](futures: …
Run Code Online (Sandbox Code Playgroud)

scala future reactive-streams akka-stream

9
推荐指数
2
解决办法
3012
查看次数

Akka-streams - 如何访问流的物化值

我正在学习与Akka流一起工作,并且非常喜欢它,但物化部分对我来说仍然有些神秘.

引自http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/client-side/host-level.html#host-level-api

...通过调用池客户端流实现的HostConnectionPool实例上的shutdown()来触发特定池的立即关闭

如何获取HostConnectionPool实例?

更重要的是,我想了解一般如何访问物化值并执行某些操作或从中检索信息.

提前感谢任何文档指针或解释.

scala akka-stream akka-http

9
推荐指数
1
解决办法
2783
查看次数

akka-http:完整的流程请求

假设我已经设置了一个任意复杂的Flow[HttpRequest, HttpResponse, Unit].

我已经可以使用所述流来处理传入的请求了

Http().bindAndHandle(flow, "0.0.0.0", 8080)
Run Code Online (Sandbox Code Playgroud)

现在我想添加日志记录,利用一些现有的指令,比如logRequestResult("my-service"){...} 有没有办法将这个指令与我的流程结合起来?我想我正在寻找另一个指令,一些类似的东西

def completeWithFlow(flow: Flow): Route
Run Code Online (Sandbox Code Playgroud)

这有可能吗?

注意:logRequestResult就是一个例子,我的问题适用于任何可能有用的指令.

akka-stream akka-http

9
推荐指数
1
解决办法
1096
查看次数

如何使用Akka HTTP通过输出流生成内容

我是Akka Streams和Akka HTTP的新手.

我想生成一个简单的HTTP服务器,它可以从文件夹的内容生成一个zip文件并将其发送到客户端.

org.zeroturnaround.zip.ZipUtil使创建zip文件的任务变得非常简单,但它需要一个outputStream.

这是我的解决方案(用Scala语言编写):

            val os = new ByteArrayOutputStream()
            ZipUtil.pack(myFolder, os)
            HttpResponse(entity = HttpEntity(
                MediaTypes.`application/zip`,
                os.toByteArray))
Run Code Online (Sandbox Code Playgroud)

此解决方案有效,但将所有内容保留在内存中,因此无法扩展.

我认为解决这个问题的关键是使用这个:

val source = StreamConverters.asOutputStream()
Run Code Online (Sandbox Code Playgroud)

但不知道如何使用它.:-(

有什么帮助吗?

scala akka akka-stream akka-http

9
推荐指数
2
解决办法
1548
查看次数

Akka Stream流量描述中的实时资源

akka-stream文档中有这样的说明,说明如下:

......可重用的流描述不能绑定到"实时"资源,任何与此类资源的连接或分配必须推迟到实现时间."实时"资源的示例是已经存在的TCP连接,多播发布者等; ...

关于这个说明,我有几个问题:

  • 除了这两个例子,还有什么其他资源可以作为直播?
    • 任何无法安全(深度)复制的东西?喜欢Thread
    • 我是否还应该避免分享任何不是线程安全的东西?
  • ActorRef那个ActorSystem用过的现有的ActorFlowMaterializer怎么样?
  • 如何在实现时间之前推迟分配?例如,在一个PushPullStage但不是在一个创建函数的构造函数中分配它是否安全FlowGraph

java scala akka akka-stream

8
推荐指数
1
解决办法
307
查看次数

从服务器关闭akka-http websocket连接

在我的场景中,客户端发送"再见"websocket消息,我需要在服务器端关闭先前建立的连接.

来自akka-http 文档:

通过从服务器逻辑中取消传入连接流(例如,将其下游连接到Sink.cancelled,将其上游连接到Source.empty),可以关闭连接.也可以通过取消IncomingConnection源连接来关闭服务器的套接字.

但是我不清楚如何考虑到这一点SinkSource在协商新连接时设置一次:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ?
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink, source))
    case None ?
      reject(ExpectedWebsocketRequestRejection)
  }
}
Run Code Online (Sandbox Code Playgroud)

scala akka-stream akka-http

8
推荐指数
1
解决办法
2071
查看次数

特别处理Akka流的第一个元素

是否有一种Source以特殊方式处理Akka流的第一个元素的惯用方法?我现在拥有的是:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }
Run Code Online (Sandbox Code Playgroud)

谢谢

scala akka akka-stream

8
推荐指数
1
解决办法
1382
查看次数

你如何处理Akka Flow的期货?

我已经构建了一个定义流的akka​​图.我的目标是重新格式化我将来的响应并将其保存到文件中.流程可以概述如下:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._
      val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false))
      val merger = builder.add(Merge[Future[Map[String, String]]](6))
      val fileSink = FileIO.toPath(outputPath, options)
      val ignoreSink = Sink.ignore
      val in = Source(seeds)
      in ~> balancer.in
      for (i <- Range(0,6)) {
        balancer.out(i) ~>
          wikiFlow.async ~>
          // This maps to a Future[Map[String, String]]
          Flow[(Try[HttpResponse], String)].map(parseHtml) ~>
          merger
      }

      merger.out ~>
      // When we merge we need to map our Map to a file
      Flow[Future[Map[String, String]]].map((d) => …
Run Code Online (Sandbox Code Playgroud)

scala future akka akka-stream

8
推荐指数
1
解决办法
4462
查看次数

将元素从外部推送到 fs2 中的反应流

我有一个外部(即,我无法更改它)Java API,如下所示:

public interface Sender {
    void send(Event e);
}
Run Code Online (Sandbox Code Playgroud)

我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些收集到一个包中并通过 HTTP 发送到某个端点的。这一切都应该异步完成,没有send()阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。

使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将它具体化并使用具体化ActorRef将新事件推送到流:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def …
Run Code Online (Sandbox Code Playgroud)

scala reactive-streams akka-stream fs2

8
推荐指数
1
解决办法
953
查看次数

将 Akka 流传递给上游服务以进行填充

我需要调用上游服务(Azure Blob 服务)将数据推送到 OutputStream,然后我需要通过 akka 将其转回客户端。如果没有 akka(只有 servlet 代码),我只会获取 ServletOutputStream 并将其传递给 azure 服务的方法。

我可以尝试偶然发现的最接近的,显然这是错误的,是这样的

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
Run Code Online (Sandbox Code Playgroud)

这个想法是我正在调用上游服务以通过调用 blobClient.download(os); 来获取填充的输出流;

似乎 lambda 函数被调用并返回,但随后它失败了,因为没有数据或其他东西。好像我不应该让那个 lambda 函数做这项工作,但也许返回一些做这项工作的对象?没有把握。

如何做到这一点?

java stream akka akka-stream

8
推荐指数
1
解决办法
271
查看次数

标签 统计

akka-stream ×10

scala ×8

akka ×5

akka-http ×4

future ×2

java ×2

reactive-streams ×2

fs2 ×1

stream ×1