Jer*_*emy 1 google-cloud-dataflow apache-beam
我正在尝试用Python编写一个数据流管道,该管道需要一个大的numpy矩阵作为辅助输入。矩阵保存在云存储中。理想情况下,每个Dataflow工作人员都可以直接从云存储中加载矩阵。
我的理解是,如果我说matrix = np.load(LOCAL_PATH_TO_MATRIX),然后
p | "computation" >> beam.Map(computation, matrix)
Run Code Online (Sandbox Code Playgroud)
矩阵从我的笔记本电脑运送到每个Datflow工作人员。
我该如何指导每个工作人员直接从云存储中加载矩阵?有用于“二进制斑点”的光束源吗?
您的方法是正确的。
在这种情况下,Dataflow所做的是将NumPy矩阵作为侧面输入进行处理。这意味着它从您的计算机上载到服务一次,然后Dataflow服务会将其发送给每个工作人员。
考虑到矩阵很大,这将使您的工作人员使用I / O从服务中接收它,并承担将整个矩阵保存在内存中的负担,但是它应该可以工作。
如果要避免在计算机中计算/加载矩阵,可以将矩阵作为文本文件上载到GCS,读入该文件并获取矩阵。您可以这样做:
matrix_file = 'gs://mybucket/my/matrix'
p | beam.ParDo(ComputationDoFn(matrix_file))
Run Code Online (Sandbox Code Playgroud)
您的DoFn可能类似于:
class ComputationDoFn(beam.DoFn):
def __init__(self, matrix_file):
self._matrix_file = matrix_file
self._matrix = None
def start_bundle(self, element):
# We check because one DoFn instance may be reused
# for different bundles.
if self._matrix is None:
self.load_matrix(self._matrix_file)
def process(self, element):
# Now process the element
def load_matrix(self, matrix_file):
# Load the file from GCS using the GCS API
Run Code Online (Sandbox Code Playgroud)
我希望这是有道理的。如果您感觉需要更多帮助,我可以完善功能。
| 归档时间: |
|
| 查看次数: |
784 次 |
| 最近记录: |