在 Beam 中初始化外部服务连接

gan*_*til 3 java google-cloud-datastore google-cloud-dataflow apache-beam

我正在编写一个数据流流管道。在其中一个转换中,DoFn 我想要访问外部服务 - 在本例中,它是数据存储区。

这种初始化步骤有没有最佳实践?我不想为每个 processElement 方法调用创建数据存储连接对象。

Pab*_*blo 5

在 Dataflow SDK 中,您可以做的最简单的事情就是添加一个检查以在第一个元素中初始化外部服务:

class DatastoreCallingDoFn extends DoFn {

    private ExtServiceHandle handle = null;

    private ExtServiceHandle initializeConnection() {
      // ...
    }

    public void processElement(ProcessContext c) {
      // ... process each element -- setup will have been called
      if (handle == null) {
        handle = initializeConnection();
      }
      // Process elements
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您使用 Beam,您可以使用@Setup装饰器来装饰您的 DoFn 中的函数DoFn,例如初始化数据存储连接。

class DatastoreCallingDoFn extends DoFn {
    @Setup
    public void initializeDatastoreConnection() {
      // ...
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // ... process each element -- setup will have been called
    }
}
Run Code Online (Sandbox Code Playgroud)

这与这个问题的答案类似。