无法将beam python pcollection转换为列表

sam*_*muq 1 python google-cloud-dataflow apache-beam

TypeError: 'PCollection' object does not support indexing
Run Code Online (Sandbox Code Playgroud)

上述错误是由于尝试将 Pcollection 转换为列表而导致的:

filesList = (files | beam.combiners.ToList())

lines = (p | 'read' >> beam.Create(ReadSHP().ReadSHP(filesList))
            | 'map' >> beam.Map(_to_dictionary))
Run Code Online (Sandbox Code Playgroud)

和:

def ReadSHP(self, filesList):
    """
    """
    sf = shp.Reader(shp=filesList[1], dbf=filesList[2])  
Run Code Online (Sandbox Code Playgroud)

如何解决这个问题?任何帮助表示赞赏。

Ant*_*ton 5

通常,您不能将 a 转换PCollection为列表。

PCollection是潜在无界且无序的项目集合。Beam 允许您将转换应用于PCollection. 将 aPTransform应用于 a 会PCollection产生另一个PCollection。并且转换的应用过程可能分布在一组机器上。所以一般情况下不可能将这样的东西转换成本地内存中的元素集合。

组合器只是PTransforms. 他们所做的就是将他们看到的所有元素累加起来,对元素应用一些组合逻辑,然后输出组合的结果。例如,组合器可以查看传入的元素,将它们相加,然后输出总和作为结果。这种组合器将PCollection元素的 a转换PCollection为这些元素的总和。

beam.combiners.ToList只是应用于 a 的另一个转换PCollection,可能是在一组工作机器上,并产生另一个PCollection. 但是它在产生输出元素之前并没有真正进行任何复杂的组合,它只是将所有看到的元素累积到一个列表中,然后输出看到的元素列表。因此,它采用键值对(在多台机器上)的元素,将它们放入列表中,然后输出这些列表。

缺少的是从可能的多台机器中获取这些列表并在需要时将它们加载到本地程序的逻辑。这个问题不能轻易(如果有的话)以通用方式(在所有运行程序、所有可能的 IO 和管道结构之间)解决。

解决方法之一是向管道添加另一个步骤,将组合输出(例如总和或列表)写入公共存储,例如某个数据库中的表或文件。然后当管道完成时,您的程序可以从该位置加载管道执行的结果。

有关详细信息,请参阅文档: