apache 光束数据流中的外部 api 调用

big*_*nty 1 java google-cloud-dataflow apache-beam apache-beam-io

我有一个用例,我读入存储在谷歌云存储中的换行 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用外部 API 来执行重复数据删除,无论该 json 元素之前是否被发现。我在做一个ParDoDoFn每个JSON。

我还没有看到任何在线教程说明如何从 apache beam DoFnDataflow调用外部 API 端点。

我正在使用JAVABeam 的 SDK。我学习的一些教程解释了使用startBundleFinishBundle但我不清楚如何使用它

小智 6

如果您需要为每个 JSON 记录检查外部存储中的重复项,那么您仍然可以使用DoFn它。有几个注释,如@Setup@StartBundle@FinishBundle等,可用于注释DoFn.

例如,如果您需要实例化一个客户端对象以向外部数据库发送请求,那么您可能希望在@Setup方法中执行此操作(如 POJO 构造函数),然后在您的@ProcessElement方法中利用此客户端对象。

让我们考虑一个简单的例子:

static class MyDoFn extends DoFn<Record, Record> {

    static transient MyClient client;

    @Setup
    public void setup() {
        client = new MyClient("host");
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // process your records
        Record r = c.element();
        // check record ID for duplicates
        if (!client.isRecordExist(r.id()) {
            c.output(r);
        }
    }

    @Teardown
    public void teardown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

此外,为了避免对每条记录进行远程调用,您可以将捆绑记录批量放入内部缓冲区(将输入数据束拆分为捆绑包)并以批处理模式检查重复项(如果您的客户支持此功能)。为此,您可能会使用@StartBundle@FinishBundle注释的方法,这些方法将在相应地处理 Beam bundle 之前和之后调用。

对于更复杂的示例,我建议查看不同 Beam IO 中的 Sink 实现,例如KinesisIO