Meg*_*ind 1 apache-kafka apache-kafka-streams
接收我只是想在CarClass上映射并想要创建新流的Json数据,但是map方法不允许我在自定义数据类型上进行映射。KStream类型的map(KeyValueMapper>)方法不适用于参数(新的KeyValueMapper>(){})?
来自http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations:
该示例将值类型从更改byte[]为Integer。对于Stringto CarClass将是相同的。
KStream<byte[], String> stream = ...;
// Java 8+ example, using lambda expressions
// Note how we change the key and the key type (similar to `selectKey`)
// as well as the value and the value type.
KStream<String, Integer> transformed = stream.map(
(key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
// Java 7 example
KStream<String, Integer> transformed = stream.map(
new KeyValueMapper<byte[], String, KeyValue<String, Integer>>() {
@Override
public KeyValue<String, Integer> apply(byte[] key, String value) {
return new KeyValue<>(value.toLowerCase(), value.length());
}
});
Run Code Online (Sandbox Code Playgroud)
但是,如果您只想修改值,我建议使用mapValues()代替map()。
| 归档时间: |
|
| 查看次数: |
2244 次 |
| 最近记录: |