大numpy矩阵作为数据流侧输入

Jer*_*emy 1 google-cloud-dataflow apache-beam

我正在尝试用Python编写一个数据流管道,该管道需要一个大的numpy矩阵作为辅助输入。矩阵保存在云存储中。理想情况下,每个Dataflow工作人员都可以直接从云存储中加载矩阵。

我的理解是,如果我说matrix = np.load(LOCAL_PATH_TO_MATRIX),然后

p | "computation" >> beam.Map(computation, matrix)
Run Code Online (Sandbox Code Playgroud)

矩阵从我的笔记本电脑运送到每个Datflow工作人员。

我该如何指导每个工作人员直接从云存储中加载矩阵?有用于“二进制斑点”的光束源吗?

Pab*_*blo 5

您的方法是正确的。

在这种情况下,Dataflow所做的是将NumPy矩阵作为侧面输入进行处理。这意味着它从您的计算机上载到服务一次,然后Dataflow服务会将其发送给每个工作人员。

考虑到矩阵很大,这将使您的工作人员使用I / O从服务中接收它,并承担将整个矩阵保存在内存中的负担,但是它应该可以工作。


如果要避免在计算机中计算/加载矩阵,可以将矩阵作为文本文件上载到GCS,读入该文件并获取矩阵。您可以这样做:

matrix_file = 'gs://mybucket/my/matrix'
p | beam.ParDo(ComputationDoFn(matrix_file))
Run Code Online (Sandbox Code Playgroud)

您的DoFn可能类似于:

class ComputationDoFn(beam.DoFn):
  def __init__(self, matrix_file):
    self._matrix_file = matrix_file
    self._matrix = None

  def start_bundle(self, element):
    # We check because one DoFn instance may be reused
    # for different bundles.
    if self._matrix is None:
      self.load_matrix(self._matrix_file)

  def process(self, element):
    # Now process the element

  def load_matrix(self, matrix_file):
    # Load the file from GCS using the GCS API
Run Code Online (Sandbox Code Playgroud)

我希望这是有道理的。如果您感觉需要更多帮助,我可以完善功能。