在 apache 光束的窗口中聚合数据

den*_*dog 3 python google-cloud-dataflow apache-beam

我正在接收一个复杂的嵌套 JSON 对象流作为我对管道的输入。

我的目标是创建小批量以供另一个pubsub主题进行下游处理。我正在努力使用该beam.beam.GroupByKey()功能 - 从我读到的内容来看,这是尝试和聚合的正确方法。

一个简化的例子,输入事件:

{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a'], url: 'websiteB.com' }
Run Code Online (Sandbox Code Playgroud)

我正在尝试创建以下内容:

{
'websiteA.com': {a:2, b:2, c:2},
'websiteB.com': {a:1},
}
Run Code Online (Sandbox Code Playgroud)

我的问题在于尝试对最简单的元组抛出ValueError: too many values to unpack.

我可以分两步运行它,但从我的阅读来看,使用beam.GroupByKey()成本很高,因此应该尽量减少。

根据@Cubez 的回答进行编辑。

这是我的组合功能,它似乎工作了一半:(

{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a'], url: 'websiteB.com' }
Run Code Online (Sandbox Code Playgroud)

似乎过去add_input什么都没有被调用?

添加管道代码:

{
'websiteA.com': {a:2, b:2, c:2},
'websiteB.com': {a:1},
}
Run Code Online (Sandbox Code Playgroud)

Cub*_*bez 7

这是需要使用组合器的完美示例。这些是用于跨多个工作人员聚合或组合集合的转换。正如文档所说,CombineFns 通过读入您的元素(beam.CombineFn.add_input),合并多个元素(beam.CombineFn.merge_accumulators),然后最终输出最终的组合值(beam.CombineFn.extract_output)来工作。在此处查看父类的 Python 文档。

例如,要创建一个输出一组数字的平均值的组合器,如下所示:

class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')

pc = ...
average = pc | beam.CombineGlobally(AverageFn())
Run Code Online (Sandbox Code Playgroud)

对于您的用例,我建议如下:

values = [
          {'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
          {'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
          {'data':['a'], 'url': 'websiteB.com'}
]

# This counts the number of elements that are the same.
def combine(counts):
  # A counter is a dictionary from keys to the number of times it has
  # seen that particular key.
  c = collections.Counter()
  for d in counts:
    c.update(d)
  return dict(c)

with beam.Pipeline(options=pipeline_options) as p:
  pc = (p
        # You should replace this step with reading data from your
        # source and transforming it to the proper format for below.
        | 'create' >> beam.Create(values)

        # This step transforms the dictionary to a tuple. For this
        # example it returns:
        # [ ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
        #   ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
        #   ('url': 'websiteB.com', 'data':['a'])]
        | 'url as key' >> beam.Map(lambda x: (x['url'], x['data']))

        # This is the magic that combines all elements with the same
        # URL and outputs a count based on the keys in 'data'.
        # This returns the elements:
        # [ ('url': 'websiteA.com', {'a': 2, 'b': 2, 'c': 2}),
        #   ('url': 'websiteB.com', {'a': 1})]
        | 'combine' >> beam.CombinePerKey(combine))

  # Do something with pc
  new_pc = pc | ...
Run Code Online (Sandbox Code Playgroud)