addng ReadAllFromText 转换时管道失败

Rah*_*eel 3 apache-beam apache-beam-io

我正在尝试在 Apache Beam 中运行一个非常简单的程序来尝试它是如何工作的。

import apache_beam as beam


class Split(beam.DoFn):
    def process(self, element):
        return element


with beam.Pipeline() as p:
    rows = (p | beam.io.ReadAllFromText(
        "input.csv") | beam.ParDo(Split()))
Run Code Online (Sandbox Code Playgroud)

运行此程序时,我收到以下错误

.... some more stack....
 File "/home/raheel/code/beam-practice/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 565, in expand
    windowing_saved = pcoll.windowing
  File "/home/raheel/code/beam-practice/lib/python2.7/site-packages/apache_beam/pvalue.py", line 137, in windowing
    self.producer.inputs)
  File "/home/raheel/code/beam-practice/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 464, in get_windowing
    return inputs[0].windowing
  File "/home/raheel/code/beam-practice/lib/python2.7/site-packages/apache_beam/pvalue.py", line 137, in windowing
    self.producer.inputs)
  File "/home/raheel/code/beam-practice/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 464, in get_windowing
    return inputs[0].windowing
AttributeError: 'PBegin' object has no attribute 'windowing'
Run Code Online (Sandbox Code Playgroud)

知道这里出了什么问题吗?

谢谢

Gui*_*ins 5

ReadAllFromText期望从文件的 PCollection 中读取而不是将其作为参数传递。所以,就你而言,它应该是:

p | beam.Create(["input.csv"])
  | beam.io.ReadAllFromText()
Run Code Online (Sandbox Code Playgroud)