MeB*_*Guy 8 java stream akka akka-stream
我需要调用上游服务(Azure Blob 服务)将数据推送到 OutputStream,然后我需要通过 akka 将其转回客户端。如果没有 akka(只有 servlet 代码),我只会获取 ServletOutputStream 并将其传递给 azure 服务的方法。
我可以尝试偶然发现的最接近的,显然这是错误的,是这样的
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
Run Code Online (Sandbox Code Playgroud)
这个想法是我正在调用上游服务以通过调用 blobClient.download(os); 来获取填充的输出流;
似乎 lambda 函数被调用并返回,但随后它失败了,因为没有数据或其他东西。好像我不应该让那个 lambda 函数做这项工作,但也许返回一些做这项工作的对象?没有把握。
如何做到这一点?
在这种情况下,OutputStream是 的“物化值”,Source并且只有在流运行(或“物化”到正在运行的流中)时才会创建它。运行它是不受你控制的,因为你将其交给SourceAkka HTTP,它稍后会实际运行你的源代码。
.mapMaterializedValue(matval -> ...)通常用于转换物化值,但由于它是作为物化的一部分调用的,因此您可以使用它来产生副作用,例如在消息中发送 matval,就像您已经发现的那样,不一定有任何问题即使它看起来很时髦。重要的是要理解,在 lambda 完成之前,流不会完成其具体化并开始运行。download()如果阻塞而不是在不同线程上分叉一些工作并立即返回,这意味着出现问题。
然而,还有另一种解决方案:Source.preMaterialize()它物化源并为您提供Pair物化值和Source可用于使用已启动源的新值:
Pair<OutputStream, Source<ByteString, NotUsed>> pair =
StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source<ByteString, NotUsed> source = pair.second();
Run Code Online (Sandbox Code Playgroud)
请注意,在您的代码中还需要考虑一些其他事情,最重要的是,如果调用blobClient.download(os)在完成之前一直阻塞并且您从参与者调用该调用,在这种情况下,您必须确保您的参与者不会让调度程序挨饿并停止应用程序中的其他参与者无法执行(请参阅 Akka 文档:https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management)。