如何使用boto将文件从Amazon S3流式传输到Rackspace Cloudfiles?

joe*_*son 30 python amazon-s3 rackspace cloudfiles boto

我正在将文件从S3复制到Cloudfiles,我想避免将文件写入磁盘.Python-Cloudfiles库有一个看起来像我需要的object.stream()调用,但我找不到boto中的等效调用.我希望我能够做到这样的事情:

shutil.copyfileobj(s3Object.stream(),rsObject.stream())
Run Code Online (Sandbox Code Playgroud)

这可能与boto(或我认为任何其他s3库)?

sma*_*llo 49

此主题中的其他答案与boto相关,但S3.Object在boto3中不再可迭代.因此,以下不起作用,它会产生一条TypeError: 's3.Object' object is not iterable错误消息:

s3 = boto3.session.Session(profile_name=my_profile).resource('s3')
s3_obj = s3.Object(bucket_name=my_bucket, key=my_key)

with io.FileIO('sample.txt', 'w') as file:
    for i in s3_obj:
        file.write(i)
Run Code Online (Sandbox Code Playgroud)

在boto3中,对象的内容也是可用的S3.Object.get()['Body'],因此以下仍然不起作用:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body:
        file.write(i)
Run Code Online (Sandbox Code Playgroud)

因此,另一种方法是使用read方法,但这会将WHOLE S3对象加载到内存中,在处理大型文件时并不总是这样:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body.read():
        file.write(i)
Run Code Online (Sandbox Code Playgroud)

但是该read方法允许传入amt指定我们想要从底层流中读取的字节数的参数.可以重复调用此方法,直到读取整个流:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    while file.write(body.read(amt=512)):
        pass
Run Code Online (Sandbox Code Playgroud)

深入研究botocore.response.StreamingBody代码,我们意识到底层流也是可用的,因此我们可以按如下方式迭代:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for b in body._raw_stream:
        file.write(b)
Run Code Online (Sandbox Code Playgroud)

谷歌搜索时我也看到了一些可以使用的链接,但我还没试过:

  • 非常有用的答案.谢谢@smallo.我很感激你暴露了我认为大多数人都在寻找的私人__raw_stream. (6认同)
  • 如果我传递这个 body `StreamingBody`,这是否意味着 HTTP 连接没有终止?或者流媒体是否已缓冲? (2认同)

gar*_*aat 20

boto中的Key对象,代表S3中的对象,可以像迭代器一样使用,所以你应该能够做到这样的事情:

>>> import boto
>>> c = boto.connect_s3()
>>> bucket = c.lookup('garnaat_pub')
>>> key = bucket.lookup('Scan1.jpg')
>>> for bytes in key:
...   write bytes to output stream
Run Code Online (Sandbox Code Playgroud)

或者,如您的示例所示,您可以执行以下操作:

>>> shutil.copyfileobj(key, rsObject.stream())
Run Code Online (Sandbox Code Playgroud)

  • S3.object 仍然是 iterable ,但是使用 S3object['body'].iter_lines() 所以像这样 (3认同)

Eli*_*Eli 20

我认为至少有一些看到这个问题的人会像我一样,并且想要一种逐行流式传输文件的方法(或用逗号或任何其他分隔符逗号).这是一个简单的方法:

def getS3ResultsAsIterator(self, aws_access_info, key, prefix):        
    s3_conn = S3Connection(**aws_access)
    bucket_obj = s3_conn.get_bucket(key)
    # go through the list of files in the key
    for f in bucket_obj.list(prefix=prefix):
        unfinished_line = ''
        for byte in f:
            byte = unfinished_line + byte
            #split on whatever, or use a regex with re.split()
            lines = byte.split('\n')
            unfinished_line = lines.pop()
            for line in lines:
                yield line
Run Code Online (Sandbox Code Playgroud)

@ garnaat上面的答案仍然很棒且100%正确.希望我的还能帮助别人.

  • 还有一点需要注意:在f:`循环中的`for byte完成后,我必须添加`yield unfinished_line`,否则最后一行不会被处理 (3认同)

Vic*_*Vic 10

BotocoreStreamingBody有一个iter_lines()方法:

https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html#botocore.response.StreamingBody.iter_lines

所以:

import boto3
s3r = boto3.resource('s3')
iterator = s3r.Object(bucket, key).get()['Body'].iter_lines()

for line in iterator:
    print(line)
Run Code Online (Sandbox Code Playgroud)


小智 7

这是我包装流体的解决方案:

import io
class S3ObjectInterator(io.RawIOBase):
    def __init__(self, bucket, key):
        """Initialize with S3 bucket and key names"""
        self.s3c = boto3.client('s3')
        self.obj_stream = self.s3c.get_object(Bucket=bucket, Key=key)['Body']

    def read(self, n=-1):
        """Read from the stream"""
        return self.obj_stream.read() if n == -1 else self.obj_stream.read(n)
Run Code Online (Sandbox Code Playgroud)

用法示例:

obj_stream = S3ObjectInterator(bucket, key)
for line in obj_stream:
    print line
Run Code Online (Sandbox Code Playgroud)