运行梁管道时,“PBegin”对象没有属性“windowing”

Pra*_*mar 3 python-3.x google-cloud-dataflow apache-beam apache-beam-io

我在运行数据流作业时发现“PBegin”对象没有属性“windowing”。我在 pardo 函数中调用 connectclass 类。

我正在尝试从 Beam python SDK 连接 NOSQL 数据库并运行 sql 以从表中提取数据。然后使用另一个 pardo 将输出写入单独的文件。

class Connector(beam.DoFn):
    def __init__(self,username,seeds,keyspace,password,datacenter=None):
    self.username = username
    self.password = password
    self.seeds = seeds
    self.keyspace = keyspace
    self.datacenter = datacenter
    super(self.__class__, self).__init__()

    def process(self, element):

    if datacenter:
        load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=self.datacenter)
    auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
    cluster = Cluster(contact_points=self.seeds,
                      load_balancing_policy=load_balancing_policy,
                      auth_provider=auth_provider)
    session=cluster.connect(self.seeds,self.keyspace,self.username, self.password, self.datacenter)
    rows = session.execute(SQL Query)
    yield rows
Run Code Online (Sandbox Code Playgroud)

Tho*_* W. 6

刚刚偶然发现了同样的问题。尝试连接到 RDBMS 源,但我想就实现设计而言,NoSQL 和 SQL 数据库之间没有区别。

除了 Jayadeep Jayaraman 的建议之外,恕我直言,这可以通过使用 ParDo 来实现。实际上,如果您的用例可以接受这样做的限制,则使用 ParDo 进行连接是Beam 文档建议的做法:

对于有界(批量)源,当前有两种创建 Beam 源的选项:

使用 ParDo 和 GroupByKey。

使用 Source 接口并扩展 BoundedSource 抽象子类。

ParDo 是推荐的选项,因为实现 Source 可能很棘手。请参阅何时使用>源接口,获取您可能想要使用源>>的一些用例列表(例如动态工作重新平衡)。

您没有展示您如何使用您的 DoFn。对我来说,记住 DoFn 作用于现有 PCollection 的元素是有帮助的。它本身无法从头开始创建 DoFn。因此,为了克服您提到的问题,您可能需要从内存创建一个 PCollection,其中包含一个用于从源检索数据的查询元素。然后应用从源读取的 ParDo 到此 PCollection。

顺便说一句:我为每个分区设计了一个元素,我想从 Pcollection 中的 RDBMS 中读取数据 - 这样就可以从 SQL 数据库中并行读取数据。

解决方案可能如下所示:

p | beam.Create(["Your Query / source object qualifier goes here"]) 
  | "Read from Database" >> beam.ParDo(YourConnector())
Run Code Online (Sandbox Code Playgroud)

我还要提一下,使用 DoFn 的 start_bundle 和 finish_bundle 方法来设置/断开连接可能是个好主意