使用 Alpakka S3 连接器的多个下载请求

zue*_*egg 5 scala amazon-s3 akka akka-stream alpakka

我正在尝试使用Alpakka S3 连接器执行以下操作:

我使用的代码是这样的:

val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("my-s3-bucket", "archive.zip")

val sourceList = (1 to 10).map(i => S3.download("my-s3-bucket", s"random$i.png").map {
    case Some((s, m)) => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), s)
})
val source = Source.combine(sourceList.head, sourceList.tail.head, sourceList.tail.tail: _*)(Merge(_))

source
    .via(Archive.zip())
    .to(s3Sink)
    .run()
Run Code Online (Sandbox Code Playgroud)

但是,这会导致以下错误:

Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.
Run Code Online (Sandbox Code Playgroud)

我怀疑这是因为 S3 连接器使用的底层 Akka Http期望在移动到下一个下载响应之前消耗每个下载响应,但我无法在不引入等待/延迟的情况下以合理的方式处理这个问题。我尝试使用队列bufferSize = 1,但这也不起作用。

我对 Akka 和 Akka Streams 相当陌生。

Lás*_*oek 1

S3.download让我们通过剖析:的返回类型来分解这里发生的情况Source[Optional[JPair[Source[ByteString, NotUsed], ObjectMetadata]], NotUsed]

外部Source代表未完成的请求。Optional如果在存储桶中找不到该文件,则该文件为空。如果存在,它包含Pair另一个Source代表文件字节内容的 ,以及ObjectMetadata代表您正在下载的文件的元数据的 。

违反直觉的是,它通常被表示为某些流操作蓝图的冷酷、无状态、可共享的部分,只有在实现Source后才会变得栩栩如生。对于外部来说,情况就是如此。然而内心却一反常态,瞬间“热”起来。一旦外部物化并发出一个项目,该项目就代表一个开放的 HTTP 连接,您应该在(默认情况下)1 秒内开始使用该连接,否则会引发错误。SourceSourceSourceResponse entity was not subscribed

在原来的问题中,Source.combine是用Merge(_)策略调用的,这会导致并行物化。Archive.zip将按顺序处理文件,但如果完全消耗Source[ByteString]它收到的第一个文件花费的时间超过 1 秒,则第二个请求将在其时间到来之前超时。

确保这种情况不会发生的一种万无一失的方法是在Source将其移交给舞台中的下一个项目之前消耗掉整个内部。考虑:

Source(1 to 10)
  .flatMapMerge(4, i => S3.download("my-s3-bucket", s"random$i.png")
    .log("started file download")
    .addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Info))
    .flatMapConcat {
      case Some((s, m)) =>
        // for demo purposes, make sure individual downloads take >1 second
        s.delay(2.seconds, DelayOverflowStrategy.backpressure)
          // read entire file contents into a single ByteString
          .reduce(_ ++ _)
          .map(bs => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), Source.single(bs)))
    })
  .log("completed file download")
  .addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Info))
  .via(Archive.zip())
  .to(s3Sink)
  .run()
Run Code Online (Sandbox Code Playgroud)

此(经过测试!)代码最多同时下载 4 个文件(第一个参数flatMapMerge)。请注意该步骤如何在reduce将响应传递到 之前读取内存中的整个响应Archive.zip()。这并不理想,但对于小文件来说可能是可以接受的。