Google Storage Python API并行下载

Rom*_*man 5 python-3.x google-cloud-storage

通过添加标志,使用gsutil将大量文件并行下载到本地计算机上是微不足道的。-m

gsutil -m cp gs://my-bucket/blob_prefix* .
Run Code Online (Sandbox Code Playgroud)

在python中,我一次只能下载一个文件:

client = storage.Client()
bucket = client.get_bucket(gs_bucket_name)
blobs = [blob for blob in bucket.list_blobs(prefix=blob_prefix)]
for blob in blobs:
    blob.download_to_filename(filename)
Run Code Online (Sandbox Code Playgroud)

最好是我想直接将数据下载到内存中(类似于blob.download_as_string()),最好是下载到生成器中。顺序并不重要。

这项功能是否存在于python API中?
如果没有,什么是最好的方法?

编辑

我已经实现了这一技巧:

def fetch_data_from_storage(fetch_pattern):
    """Download blobs to local first, then load them into Generator."""
    tmp_save_dir = os.path.join("/tmp", "tmp_gs_download")
    if os.path.isdir(tmp_save_dir):
        shutil.rmtree(tmp_save_dir)  # empty tmp dir first
    os.makedirs(tmp_save_dir)  # create tmp dir

    download_command = ["gsutil", "-m", "cp", "gs://{}/{}".format(bucket.name, fetch_pattern), tmp_save_dir]
    resp = subprocess.call(download_command)

    for file in os.listdir(tmp_save_dir):
        with open(os.path.join(tmp_save_dir, file), 'r') as f_data:
            content = json.load(f_data)
            yield content
Run Code Online (Sandbox Code Playgroud)

请告知这是否在某个地方以更好的方式实现。

Iñi*_*igo 5

检查 Google Cloud Storage 的 Python 客户端库[1]我可以得出结论,没有直接的方法来并行下载多个文件。

我通常使用客户端库(即一次一个文件)运行一些测试,并使用os.system传递gsutil命令来检查时间差异,并且这样做os.system速度要快得多(至少对于小文件而言)。你能告诉我你的情况怎么样吗?这是我使用的代码(相当简单):

from google.cloud import storage
import time
import os
start_time = time.time()

download_command = "gsutil -m cp gs://<bucket>/* . "
os.system(download_command)
elapsed_time = time.time() - start_time
print(elapsed_time)
Run Code Online (Sandbox Code Playgroud)

我代表您为此提交了功能请求,您可以在此处跟踪它[2]


Rom*_*man 5

好的,这是我的多处理多线程解决方案。下面是它的工作原理:

  • 1) 使用子进程和gsutil ls -l pattern获取 blob 名称及其文件大小的列表。这是输入的模式__main__
  • 2) 根据最大​​批量大小创建批次名称。默认为 1MB。然后,一个大文件将只创建一批 1。
  • 3) 每批被送到不同的工序。默认进程 =cpu_count - 2
  • 4)每个进程中的每个batch都是多线程的(默认maxthreads=10)在下一个batch开始之前需要完成一批线程。
  • 5) 每个线程下载一个 blob 并将其与其元数据结合起来。
  • 6) 结果通过共享资源和内存分配向上传播。

我写这个的原因:

  • 我还需要元数据,它在 gsutil 中丢失了(关键)
  • 如果某些部分失败(次要),我想要一些重试控制

~2500 个小 (<50kb) (=55MB) 文件的速度比较:

  • 一次一个文件(包括元数据):25 分 13 秒
  • gsutil -m cp (无元数据):0m35s
  • 下面的代码(包括元数据):1m43s

from typing import Iterable, Generator
import logging
import json
import datetime
import re
import subprocess
import multiprocessing
import threading

from google.cloud import storage

logging.basicConfig(level='INFO')
logger = logging.getLogger(__name__)


class StorageDownloader:

    def __init__(self, bucket_name):
        self.bucket_name = bucket_name
        self.bucket = storage.Client().bucket(bucket_name)

    def create_blob_batches_by_pattern(self, fetch_pattern, max_batch_size=1e6):
        """Fetch all blob names according to the pattern and the blob size.

        :param fetch_pattern: The gsutil matching pattern for files we want to download.
          A gsutil pattern is used instead of blob prefix because it is more powerful.
        :type fetch_pattern: str
        :param max_batch_size: Maximum size per batch in bytes.  Default = 1 MB = 1e6 bytes
        :type max_batch_size: float or int
        :return: Generator of batches of blob names.
        :rtype: Generator of list
        """
        download_command = ["gsutil", "ls", "-l", "gs://{}/{}".format(self.bucket.name, fetch_pattern)]
        logger.info("Gsutil list command command: {}".format(download_command))
        blob_details_raw = subprocess.check_output(download_command).decode()
        regexp = r"(\d+) +\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ +gs:\/\/\S+?\/(\S+)"
        # re.finditer returns a generator so we don't duplicate memory need in case the string is quite large
        cum_batch_size = 0
        batch = []
        batch_nr = 1
        for reg_match in re.finditer(regexp, blob_details_raw):
            blob_name = reg_match.group(2)
            byte_size = int(reg_match.group(1))

            batch.append(blob_name)
            cum_batch_size += byte_size

            if cum_batch_size > max_batch_size:
                yield batch
                batch = []
                cum_batch_size = 0
                batch_nr += 1

        logger.info("Created {} batches with roughly max batch size = {} bytes".format(batch_nr, int(max_batch_size)))
        if batch:
            yield batch  # if we still have a batch left, then it must also be yielded

    @staticmethod
    def download_batch_into_memory(batch, bucket, inclue_metadata=True, max_threads=10):
        """Given a batch of storage filenames, download them into memory.

        Downloading the files in a batch is multithreaded.

        :param batch: A list of gs:// filenames to download.
        :type batch: list of str
        :param bucket: The google api pucket.
        :type bucket: google.cloud.storage.bucket.Bucket
        :param inclue_metadata: True to inclue metadata
        :type inclue_metadata: bool
        :param max_threads: Number of threads to use for downloading batch.  Don't increase this over 10.
        :type max_threads: int
        :return: Complete blob contents and metadata.
        :rtype: dict
        """
        def download_blob(blob_name, state):
            """Standalone function so that we can multithread this."""
            blob = bucket.blob(blob_name=blob_name)
            content = json.loads(blob.download_as_string())
            if inclue_metadata:
                blob.reload()
                metadata = blob.metadata
                if metadata:
                    state[blob_name] = {**content, **metadata}
            state[blob_name] = content

        batch_data = {bn: {} for bn in batch}
        threads = []
        active_thread_count = 0
        for blobname in batch:
            thread = threading.Thread(target=download_blob, kwargs={"blob_name": blobname, "state": batch_data})
            threads.append(thread)
            thread.start()
            active_thread_count += 1
            if active_thread_count == max_threads:
                # finish up threads in batches of size max_threads.  A better implementation would be a queue
                #   from which the threads can feed, but this is good enough if the blob size is roughtly the same.
                for thread in threads:
                    thread.join()
                threads = []
                active_thread_count = 0

        # wait for the last of the threads to be finished
        for thread in threads:
            thread.join()
        return batch_data

    def multiprocess_batches(self, batches, max_processes=None):
        """Spawn parallel process for downloading and processing batches.

        :param batches: An iterable of batches, probably a Generator.
        :type batches: Iterable
        :param max_processes: Maximum number of processes to spawn.  None for cpu_count
        :type max_processes: int or None
        :return: The response form all the processes.
        :rtype: dict
        """
        if max_processes is None:
            max_processes = multiprocessing.cpu_count() - 2
            logger.info("Using {} processes to process batches".format(max_processes))

        def single_proc(mp_batch, mp_bucket, batchresults):
            """Standalone function so that we can multiprocess this."""
            proc_res = self.download_batch_into_memory(mp_batch, mp_bucket)
            batchresults.update(proc_res)

        pool = multiprocessing.Pool(processes=max_processes)
        batch_results = multiprocessing.Manager().dict()

        jobs = []
        for batch in batches:
            logger.info("Processing batch with {} elements".format(len(batch)))
            # the client is not thread safe, so need to recreate the client for each process.
            bucket = storage.Client().get_bucket(self.bucket_name)
            proc = pool.Process(
                target=single_proc,
                kwargs={"mp_batch": batch, "mp_bucket": bucket, "batchresults": batch_results}
            )
            jobs.append(proc)
            proc.start()

        for job in jobs:
            job.join()

        logger.info("finished downloading {} blobs".format(len(batch_results)))
        return batch_results

    def bulk_download_as_dict(self, fetch_pattern):
        """Download blobs from google storage to

        :param fetch_pattern: A gsutil storage pattern.
        :type fetch_pattern: str
        :return: A dict with k,v pairs = {blobname: blob_data}
        :rtype: dict
        """
        start = datetime.datetime.now()
        filename_batches = self.create_blob_batches_by_pattern(fetch_pattern)
        downloaded_data = self.multiprocess_batches(filename_batches)
        logger.info("time taken to download = {}".format(datetime.datetime.now() - start))
        return downloaded_data


if __name__ == '__main__':
    stor = StorageDownloader("mybucket")
    data = stor.bulk_download_as_dict("some_prefix*")
Run Code Online (Sandbox Code Playgroud)

这仍然可以使用相当多的优化(例如将线程排队而不是等待块完成),但这对我来说现在已经足够了。

  • 这不是仍然依赖于 gsutil 吗?如果是的话,可以简单地使用 -m 标志来并行化。我还希望使用 python 客户端并行化复制操作,但到目前为止还没有运气 (2认同)