Spring WebFlux Webclient 接收应用程序/八位字节流文件作为 Mono

dja*_*son 2 kotlin spring-webflux

我正在 Kotlin 中制作一个小型 Spring WebFlux 应用程序的原型。此应用程序需要从远程 REST 端点获取 tar 存档并将其存储在本地磁盘上。听起来很简单。

我首先创建了一个集成测试,该测试启动 spring 服务器和另一个 WebFlux 服务器,该服务器具有一个模拟 REST 端点,该端点为 tar 存档提供服务。

测试应该是这样的:

1)应用程序:获取 mock-server/archive

2) 模拟服务器:响应状态为 200,主体中的 tar 存档作为类型附件

3)app:阻塞直到收到所有字节,然后解压并使用文件

我遇到的问题是,当我尝试将字节收集到ByteArray应用程序的a 中时,它会永远阻塞。

我的mock-server/archive路线到以下功能:

fun serveArchive(request: ServerRequest): Mono<ServerResponse> {
    val tarFile = FileSystemResource(ARCHIVE_PATH)
    assert(tarFile.exists() && tarFile.isFile && tarFile.contentLength() != 0L)
    return ServerResponse
            .ok()
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .contentLength(tarFile.contentLength())
            .header("Content-Disposition", "attachment; filename=\"$ARCHIVE_FNAME\"")
            .body(fromResource(tarFile))
}
Run Code Online (Sandbox Code Playgroud)

然后我的应用程序使用以下内容调用它:

private fun retrieveArchive {
    client.get().uri(ARCHIVE_URL).accept(MediaType.APPLICATION_OCTET_STREAM)
            .exchange()
            .flatMap { response ->
                storeArchive(response.bodyToMono())
            }.subscribe()
}

private fun storeArchive(archive: Mono<ByteArrayResource>): Mono<Void> {
    val archiveContentBytes = archive.block() // <- this blocks forever
    val archiveContents = TarArchiveInputStream(archiveContentBytes.inputStream)
    // read archive
}
Run Code Online (Sandbox Code Playgroud)

我已经看到如何最好地从 Spring WebClient 的 ClientResponse 获取字节数组?这就是为什么我试图使用ByteArrayResource.

当我逐步完成所有操作时,我发现这serveArchive似乎在起作用(断言语句说我传递的文件存在并且其中有一些字节)。在retrieveArchive我得到一个 200 并且可以看到所有合适的信息.headers(content-type,content-length 都很好看)。当我开始storeArchive尝试从 Mono 中检索字节时block,它会永远阻塞。

我完全不知道如何调试这样的东西。

Dmi*_*try 5

您只需要从 返回转换后的主体,flatMap以便从 转换Mono<T>T

client.get().uri(ARCHIVE_URL).accept(MediaType.APPLICATION_OCTET_STREAM)
            .exchange()
            .flatMap { response ->
                response.bodyToMono(ByteArrayResource::class.java)
            }
            .map { archiveContentBytes ->
                archiveContentBytes.inputStream
            }
            .doOnSuccess { inputStream ->
                //here is you code to do anything with the inputStream
                val archiveContents = TarArchiveInputStream(inputStream)
            }
            .subscribe()
Run Code Online (Sandbox Code Playgroud)