我需要akka.stream.scaladsl.Source[T, Unit]从一个集合中创建一个Future[T].
例如,有一组期货返回整数,
val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
Run Code Online (Sandbox Code Playgroud)
如何创建一个
val source: Source[Int, Unit] = ???
Run Code Online (Sandbox Code Playgroud)
从中.
我不能使用Future.sequence组合器,从那以后我会等待每个未来完成之后从源头获取任何东西.我想在任何未来完成后立即以任何顺序获得结果.
我知道这Source是一个纯粹的功能API,它不应该以某种方式实现它之前运行任何东西.所以,我的想法是使用Iterator(懒惰)来创建一个源:
Source { () =>
new Iterator[Future[Int]] {
override def hasNext: Boolean = ???
override def next(): Future[Int] = ???
}
}
Run Code Online (Sandbox Code Playgroud)
但这将是未来的来源,而不是实际价值.我也可以阻止next使用,Await.result(future)但我不确定哪个胎面池的线程将被阻止.这也将顺序调用期货,而我需要并行执行.
更新2:事实证明有一种更简单的方法(感谢Viktor Klang):
Source(futures).mapAsync(1)(identity)
Run Code Online (Sandbox Code Playgroud)
更新:这是基于@sschaef回答我得到的:
def futuresToSource[T](futures: …Run Code Online (Sandbox Code Playgroud) 我正在学习与Akka流一起工作,并且非常喜欢它,但物化部分对我来说仍然有些神秘.
...通过调用池客户端流实现的HostConnectionPool实例上的shutdown()来触发特定池的立即关闭
如何获取HostConnectionPool实例?
更重要的是,我想了解一般如何访问物化值并执行某些操作或从中检索信息.
提前感谢任何文档指针或解释.
假设我已经设置了一个任意复杂的Flow[HttpRequest, HttpResponse, Unit].
我已经可以使用所述流来处理传入的请求了
Http().bindAndHandle(flow, "0.0.0.0", 8080)
Run Code Online (Sandbox Code Playgroud)
现在我想添加日志记录,利用一些现有的指令,比如logRequestResult("my-service"){...}
有没有办法将这个指令与我的流程结合起来?我想我正在寻找另一个指令,一些类似的东西
def completeWithFlow(flow: Flow): Route
Run Code Online (Sandbox Code Playgroud)
这有可能吗?
注意:logRequestResult就是一个例子,我的问题适用于任何可能有用的指令.
我是Akka Streams和Akka HTTP的新手.
我想生成一个简单的HTTP服务器,它可以从文件夹的内容生成一个zip文件并将其发送到客户端.
org.zeroturnaround.zip.ZipUtil使创建zip文件的任务变得非常简单,但它需要一个outputStream.
这是我的解决方案(用Scala语言编写):
val os = new ByteArrayOutputStream()
ZipUtil.pack(myFolder, os)
HttpResponse(entity = HttpEntity(
MediaTypes.`application/zip`,
os.toByteArray))
Run Code Online (Sandbox Code Playgroud)
此解决方案有效,但将所有内容保留在内存中,因此无法扩展.
我认为解决这个问题的关键是使用这个:
val source = StreamConverters.asOutputStream()
Run Code Online (Sandbox Code Playgroud)
但不知道如何使用它.:-(
有什么帮助吗?
akka-stream文档中有这样的说明,说明如下:
......可重用的流描述不能绑定到"实时"资源,任何与此类资源的连接或分配必须推迟到实现时间."实时"资源的示例是已经存在的TCP连接,多播发布者等; ...
关于这个说明,我有几个问题:
Thread?ActorRef那个ActorSystem用过的现有的ActorFlowMaterializer怎么样?PushPullStage但不是在一个创建函数的构造函数中分配它是否安全FlowGraph?在我的场景中,客户端发送"再见"websocket消息,我需要在服务器端关闭先前建立的连接.
来自akka-http 文档:
通过从服务器逻辑中取消传入连接流(例如,将其下游连接到Sink.cancelled,将其上游连接到Source.empty),可以关闭连接.也可以通过取消IncomingConnection源连接来关闭服务器的套接字.
但是我不清楚如何考虑到这一点Sink并Source在协商新连接时设置一次:
(get & path("ws")) {
optionalHeaderValueByType[UpgradeToWebsocket]() {
case Some(upgrade) ?
val connectionId = UUID()
complete(upgrade.handleMessagesWithSinkSource(sink, source))
case None ?
reject(ExpectedWebsocketRequestRejection)
}
}
Run Code Online (Sandbox Code Playgroud) 是否有一种Source以特殊方式处理Akka流的第一个元素的惯用方法?我现在拥有的是:
var firstHandled = false
source.map { elem =>
if(!firstHandled) {
//handle specially
firstHandled = true
} else {
//handle normally
}
}
Run Code Online (Sandbox Code Playgroud)
谢谢
我已经构建了一个定义流的akka图.我的目标是重新格式化我将来的响应并将其保存到文件中.流程可以概述如下:
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false))
val merger = builder.add(Merge[Future[Map[String, String]]](6))
val fileSink = FileIO.toPath(outputPath, options)
val ignoreSink = Sink.ignore
val in = Source(seeds)
in ~> balancer.in
for (i <- Range(0,6)) {
balancer.out(i) ~>
wikiFlow.async ~>
// This maps to a Future[Map[String, String]]
Flow[(Try[HttpResponse], String)].map(parseHtml) ~>
merger
}
merger.out ~>
// When we merge we need to map our Map to a file
Flow[Future[Map[String, String]]].map((d) => …Run Code Online (Sandbox Code Playgroud) 我有一个外部(即,我无法更改它)Java API,如下所示:
public interface Sender {
void send(Event e);
}
Run Code Online (Sandbox Code Playgroud)
我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些收集到一个包中并通过 HTTP 发送到某个端点的。这一切都应该异步完成,没有send()阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。
使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将它具体化并使用具体化ActorRef将新事件推送到流:
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
.via(CustomBuffer(bufferSize)) // buffer all events
.groupedWithin(batchSize, flushDuration) // group events into chunks
.map(toBundle) // convert each chunk into a JSON message
.mapAsyncUnordered(1)(sendHttpRequest) // send an HTTP request
.toMat(Sink.foreach { response =>
// print HTTP response for debugging
})(Keep.both)
lazy val (eventsActor, completeFuture) = eventPipeline.run()
override def …Run Code Online (Sandbox Code Playgroud) 我需要调用上游服务(Azure Blob 服务)将数据推送到 OutputStream,然后我需要通过 akka 将其转回客户端。如果没有 akka(只有 servlet 代码),我只会获取 ServletOutputStream 并将其传递给 azure 服务的方法。
我可以尝试偶然发现的最接近的,显然这是错误的,是这样的
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
Run Code Online (Sandbox Code Playgroud)
这个想法是我正在调用上游服务以通过调用 blobClient.download(os); 来获取填充的输出流;
似乎 lambda 函数被调用并返回,但随后它失败了,因为没有数据或其他东西。好像我不应该让那个 lambda 函数做这项工作,但也许返回一些做这项工作的对象?没有把握。
如何做到这一点?