小编ora*_*yer的帖子

如何从Google Dataflow中的PCollection中获取元素列表并在管道中使用它来循环写入转换?

我正在使用带有Python SDK的Google Cloud Dataflow.

我想要 :

  • 从主PCollection中获取唯一日期列表
  • 循环遍历该列表中的日期以创建过滤的PCollections(每个都具有唯一的日期),并将每个过滤的PCollection写入BigQuery中时间分区表中的分区.

我怎样才能获得该列表?在下面的combine变换之后,我创建了一个ListPCollectionView对象但是我无法迭代该对象:

class ToUniqueList(beam.CombineFn):

    def create_accumulator(self):
        return []

    def add_input(self, accumulator, element):
        if element not in accumulator:
            accumulator.append(element)
        return accumulator

    def merge_accumulators(self, accumulators):
        return list(set(accumulators))

    def extract_output(self, accumulator):
        return accumulator


def get_list_of_dates(pcoll):

    return (pcoll
            | 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))
Run Code Online (Sandbox Code Playgroud)

我做错了吗?最好的方法是什么?

谢谢.

python google-bigquery google-cloud-dataflow apache-beam

2
推荐指数
1
解决办法
4427
查看次数