OPT*_*MUS 4 python google-cloud-platform airflow
我正在使用 python 以及 airflow 和 gcp python 库。我使用气流 dags 自动化了将文件发送到 gcp 的过程。代码如下:-
for fileid, filename in files_dictionary.items():
if ftp.size(filename) <= int(MAX_FILE_SIZE):
data = BytesIO()
ftp.retrbinary('RETR ' + filename, callback=data.write)
f = client.File(client, fid=fileid)
size = sys.getsizeof(data.read())
// Another option is to use FileIO but not sure how
f.send(data, filename, size) // This method is in another library
Run Code Online (Sandbox Code Playgroud)
触发上传的代码是当前的存储库(如上所示),但真正的上传是由另一个不受我们控制的依赖项完成的。该方法的文档是
def send(self, fp, filename, file_bytes):
"""Send file to cloud
fp file object
filename is the name of the file.
file_bytes is the size of the file in bytes
"""
data = self.initiate_resumable_upload(self.getFileid())
_, blob = self.get_gcs_blob_and_bucket(data)
# Set attachment filename. Does this work with datasets with folders
original_filename = filename.rsplit(os.sep, 1)[-1]
blob.content_disposition = "attachment;filename=" + original_filename
blob.upload_from_file(fp)
self.finish_resumable_upload(self.getFileid())
Run Code Online (Sandbox Code Playgroud)
我遇到以下错误
[2020-04-23 09:43:17,239] {{models.py:1788}} ERROR - Stream must be at beginning.
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1657, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 103, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 108, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/transfer_data.py", line 241, in upload
f.send(data, filename, size)
File "/usr/local/lib/python3.6/site-packages/client/utils.py", line 53, in wrapper_timer
value = func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/client/client.py", line 518, in send
blob.upload_from_file(fp)
File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1158, in upload_from_file
client, file_obj, content_type, size, num_retries, predefined_acl
File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1068, in _do_upload
client, stream, content_type, size, num_retries, predefined_acl
File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1011, in _do_resumable_upload
predefined_acl=predefined_acl,
File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 960, in _initiate_resumable_upload
stream_final=False,
File "/usr/local/lib/python3.6/site-packages/google/resumable_media/requests/upload.py", line 343, in initiate
stream_final=stream_final,
File "/usr/local/lib/python3.6/site-packages/google/resumable_media/_upload.py", line 415, in _prepare_initiate_request
raise ValueError(u"Stream must be at beginning.")
ValueError: Stream must be at beginning.
Run Code Online (Sandbox Code Playgroud)
小智 7
该upload_from_file
函数有一个参数可以seek(0)
为您处理调用:
我会将您的 upload_from_file 调用修改为:
blob.upload_from_file(file_obj=fp, rewind=True)
这应该可以解决问题,并且您不需要包含额外的内容seek()
读取二进制文件时,您可以使用查找操作来浏览它。换句话说,您可以将引用从文件的开头移动到任何其他位置。该错误ValueError: Stream must be at beginning.
基本上是说:“您的引用没有指向流的开头,但它必须是”
鉴于此,您需要将引用设置回流的开头。您可以使用该函数来做到这一点seek
。
在你的情况下,你会做类似的事情:
data = BytesIO()
ftp.retrbinary('RETR ' + filename, callback=data.write)
f = client.File(client, fid=fileid)
size = sys.getsizeof(data.read())
data.seek(0)
f.send(data, filename, size)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3646 次 |
最近记录: |