将不可搜索的类文件对象流式传输到多个接收器

tre*_*mby 5 python stream python-3.x

我有一个不可搜索的文件对象.特别是它是来自HTTP请求的不确定大小的文件.

import requests
fileobj = requests.get(url, stream=True)
Run Code Online (Sandbox Code Playgroud)

我正在将此文件传输到对Amazon AWS SDK功能的调用,该功能正在将内容写入Amazon S3.这工作正常.

import boto3
s3 = boto3.resource('s3')
s3.bucket('my-bucket').upload_fileobj(fileobj, 'target-file-name')
Run Code Online (Sandbox Code Playgroud)

但是,在将其流式传输到S3的同时,我还希望将数据流式传输到另一个进程.这个其他过程可能不需要整个流,可能会在某个时候停止收听; 这很好,不应该影响到S3的流.

重要的是我不要使用太多内存,因为其中一些文件可能非常庞大.出于同样的原因,我不想写任何东西到磁盘.

我不介意任何一个接收器由于另一个慢速而减速,只要S3最终得到整个文件,并且数据转到两个接收器(而不是每个接收器仍然需要它).

在Python(3)中最好的方法是什么?我知道我不能只将相同的文件对象传递给两个接收器,例如

s3.bucket('my-bucket').upload_fileobj(fileobj, 'target-file-name')
# At the same time somehow as
process = subprocess.Popen(['myapp'], stdin=fileobj)
Run Code Online (Sandbox Code Playgroud)

我想我可以为类似文件的对象编写一个包装器,它不仅将任何数据传递给调用者(也就是S3接收器),而且还传递给另一个进程.就像是

class MyFilewrapper(object):
    def __init__(self, fileobj):
        self._fileobj = fileobj
        self._process = subprocess.Popen(['myapp'], stdin=popen.PIPE)
    def read(self, size=-1):
        data = self._fileobj.read(size)
        self._process.stdin.write(data)
        return data

filewrapper = MyFilewrapper(fileobj)
s3.bucket('my-bucket').upload_fileobj(filewrapper, 'target-file-name')
Run Code Online (Sandbox Code Playgroud)

但有更好的方法吗?也许是这样的

streams = StreamDuplicator(fileobj, streams=2)
s3.bucket('my-bucket').upload_fileobj(streams[0], 'target-file-name')
# At the same time somehow as
process = subprocess.Popen(['myapp'], stdin=streams[1])
Run Code Online (Sandbox Code Playgroud)

cod*_*kel 1

您的解决方案会出现一些不适MyFilewrapper,因为内部 IO 循环upload_fileobj现在控制着将数据提供给严格来说与上传无关的子进程。

一个“正确的”解决方案将涉及一个上传 API,它提供一个类似文件的对象,用于通过外部循环写入上传流。这将允许您“干净地”将数据提供给两个目标流。

以下示例显示了基本概念。虚构的startupload方法提供了用于上传的类文件对象。当然,您需要添加适当的错误处理等。

fileobj = requests.get(url, stream=True)

upload_fd = s3.bucket('my-bucket').startupload('target-file-name')
other_fd = ... # Popen or whatever

buf = memoryview(bytearray(4046))
while True:
    r = fileobj.read_into(buf)
    if r == 0:
        break

    read_slice = buf[:r]
    upload_fd.write(read_slice)
    other_fd.write(read_slice)
Run Code Online (Sandbox Code Playgroud)