使用 gcp python 的气流:ValueError:流必须位于开头

OPT*_*MUS 4 python google-cloud-platform airflow

我正在使用 python 以及 airflow 和 gcp python 库。我使用气流 dags 自动化了将文件发送到 gcp 的过程。代码如下:-

for fileid, filename in files_dictionary.items():
    if ftp.size(filename) <= int(MAX_FILE_SIZE):
        data = BytesIO()
        ftp.retrbinary('RETR ' + filename, callback=data.write)
        f = client.File(client, fid=fileid)
        size = sys.getsizeof(data.read())
        // Another option is to use FileIO but not sure how
        f.send(data, filename, size) // This method is in another library 
Run Code Online (Sandbox Code Playgroud)

触发上传的代码是当前的存储库(如上所示),但真正的上传是由另一个不受我们控制的依赖项完成的。该方法的文档是

 def send(self, fp, filename, file_bytes):
        """Send file to cloud
        fp file object
        filename   is the name of the file.
        file_bytes is the size of the file in bytes
        """
        data = self.initiate_resumable_upload(self.getFileid())

        _, blob = self.get_gcs_blob_and_bucket(data)

        # Set attachment filename. Does this work with datasets with folders
        original_filename = filename.rsplit(os.sep, 1)[-1]
        blob.content_disposition = "attachment;filename=" + original_filename

        blob.upload_from_file(fp)

        self.finish_resumable_upload(self.getFileid())
Run Code Online (Sandbox Code Playgroud)

我遇到以下错误

[2020-04-23 09:43:17,239] {{models.py:1788}} ERROR - Stream must be at beginning.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1657, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 103, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 108, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/transfer_data.py", line 241, in upload
    f.send(data, filename, size)
  File "/usr/local/lib/python3.6/site-packages/client/utils.py", line 53, in wrapper_timer
    value = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/client/client.py", line 518, in send
    blob.upload_from_file(fp)
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1158, in upload_from_file
    client, file_obj, content_type, size, num_retries, predefined_acl
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1068, in _do_upload
    client, stream, content_type, size, num_retries, predefined_acl
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1011, in _do_resumable_upload
    predefined_acl=predefined_acl,
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 960, in _initiate_resumable_upload
    stream_final=False,
  File "/usr/local/lib/python3.6/site-packages/google/resumable_media/requests/upload.py", line 343, in initiate
    stream_final=stream_final,
  File "/usr/local/lib/python3.6/site-packages/google/resumable_media/_upload.py", line 415, in _prepare_initiate_request
    raise ValueError(u"Stream must be at beginning.")
ValueError: Stream must be at beginning.
Run Code Online (Sandbox Code Playgroud)

小智 7

upload_from_file函数有一个参数可以seek(0)为您处理调用:

我会将您的 upload_from_file 调用修改为:

blob.upload_from_file(file_obj=fp, rewind=True)

这应该可以解决问题,并且您不需要包含额外的内容seek()


rme*_*ves 5

读取二进制文件时,您可以使用查找操作来浏览它。换句话说,您可以将引用从文件的开头移动到任何其他位置。该错误ValueError: Stream must be at beginning.基本上是说:“您的引用没有指向流的开头,但它必须是”

鉴于此,您需要将引用设置回流的开头。您可以使用该函数来做到这一点seek

在你的情况下,你会做类似的事情:

    data = BytesIO()
    ftp.retrbinary('RETR ' + filename, callback=data.write)
    f = client.File(client, fid=fileid)
    size = sys.getsizeof(data.read())
    data.seek(0)
    f.send(data, filename, size)
Run Code Online (Sandbox Code Playgroud)