如何在 Apache Beam 中一起使用 MapElements 和 KV?

Tam*_*ura 5 java java-8 apache-beam

我想做类似的事情:

PCollection<String> a = whatever;
PCollection<KV<String, User>> b = a.apply(
        MapElements.into(TypeDescriptor.of(KV<String, User>.class))
        .via(s -> KV.of(s, new User(s))));
Run Code Online (Sandbox Code Playgroud)

其中 User 是带有 Arvo 编码器的自定义数据类型和考虑字符串的构造函数。

但是,我收到以下错误:

无法从参数化类型中选择

我尝试将其更改为TypeDescriptor.of(KV.class),但后来我得到:

不兼容的类型;必需的 PCollection> 但“apply”被推断为 OutputT:不存在类型变量的实例,因此 PCollection 符合 PCollection>

那么我该如何使用KVwithMapElements呢?

我知道我想要做的事情是可以使用ParDo我可以明确指定如何通过清除来执行类型擦除new DoFn<String, KV<String, User>>ParDo不支持 lambda 函数的地方。由于我们使用的是 Java 8,这似乎不太优雅......

Luk*_*wik 6

由于编译期间Java 中的类型擦除KV<String, User>.class,由于类型变量已被擦除,因此KV.class在运行时KV.class没有足够的信息来推断编码器。

为了解决这个限制,您需要使用一种在编译后保留类型信息的机制。例如您可以使用:

TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(User.class))
Run Code Online (Sandbox Code Playgroud)

这与提供您自己的匿名类相同:

new TypeDescriptor<KV<String, User>> {}
Run Code Online (Sandbox Code Playgroud)

为匿名类提供绑定的类型变量是当前解决 Java 类型擦除问题的方法之一。