Google protobuf 和 python 中的多线程

Qui*_*iva 5 python multithreading protocol-buffers google-ads-api

如何在多进程脚本中使用 Google 协议缓冲区?

我的用例是:

  • 从新的 Google Ads API 中提取数据
  • 向对象附加元数据
  • 使用对象建模
  • 将结果推送到数据库

AdWords 广告活动包装对象

我有一个针对旧 AdWords API 的现有流程,我在其中提取数据并将其存储在自定义类中,例如

class Campaign(Represantable):
    def __init__(self, id, managed_customer_id, base_campaign_id, name, status, serving_status):
        self.id = id
        self.managed_customer_id = managed_customer_id
        self.base_campaign_id = base_campaign_id
        self.name = name
        self.status = status
        self.serving_status = serving_status

    @classmethod
    def from_zeep(cls, campaign, managed_customer_id):
        return cls(
            campaign.id,
            managed_customer_id,
            campaign.baseCampaignId,
            campaign.name,
            campaign.status,
            campaign.servingStatus
        )
Run Code Online (Sandbox Code Playgroud)

多处理脚本

如果我想从十几个帐户中提取营销活动,我可以Campaign使用以下命令并行运行填充对象的脚本pathos(再次简化了本示例的代码):

import multiprocessing as mp
from pathos.pools import ProcessPool

class WithParallelism(object):
    def __init__(self, parallelism_level):
        self.parallelism_level = parallelism_level

    def _parallel_apply(self, fn, collection, **kwargs):
        pool = ProcessPool(
            nodes=self.parallelism_level
        )
        
        # this is to prevent Python from printing large traces when user interrupts execution (e.g. Ctrl+C)
        def keyboard_interrupt_wrapper_fn(*args_wrapped):
            try:
                return fn(*args_wrapped, **kwargs)
            except KeyboardInterrupt:
                pass
            except Exception as err:
                return err

        errors = pool.map(keyboard_interrupt_wrapper_fn, collection)

        return error
Run Code Online (Sandbox Code Playgroud)

Google Ads 活动包装对象

使用新的API,我计划将 protobuf 对象存储在我的类中,并使用指针来访问对象属性。我的类比示例复杂得多,使用描述符和子类 init 作为属性,但为了简单起见,它实际上是这样的:

class Campaign(Proto):
    def __init__(self, **kwargs):
        if "proto" in kwargs:
            self._proto = kwargs['proto']
        if "parent" in kwargs:
            self._parent = kwargs['parent']
        self._init_metadata(**kwargs)


    @property
    def id(self):
        return self._proto.id.value

    @property
    def name(self):
        return self._proto.name.value

   ...
Run Code Online (Sandbox Code Playgroud)

这样做的另一个优点是能够遍历parentGoogle Ads 对象,从该对象中提取数据protobuf

但是,当我运行脚本来并行填充这些新对象时,出现错误pickle。据我所知,它multiprocess用于pickle序列化对象,protobuf对象的主要优点之一是它们可以轻松序列化。

我应该如何并行提取新的 Google Ads 数据:

  • Campaign我应该使用序列化和反序列化对象中的数据吗SerializeToString
  • 我是否应该像使用 AdWords 那样提取并存储标量数据 ( id, )name
  • 有完全不同的方法吗?