小编nan*_*boo的帖子

当原始地图包含集合作为值时,如何创建反向地图?

假设我的原文Map包含以下内容:

Map<String, Set<String>> original = Maps.newHashMap();
original.put("Scott", Sets.newHashSet("Apple", "Pear", "Banana");
original.put("Jack", Sets.newHashSet("Banana", "Apple", "Orange");
Run Code Online (Sandbox Code Playgroud)

我想创建一个Map包含以下内容的反转:

  "Apple":  ["Scott", "Jack"]
  "Pear":   ["Scott"]
  "Banana": ["Scott", "Jack"]
  "Orange": ["Jack"]
Run Code Online (Sandbox Code Playgroud)

我知道它可以用旧方式(Java 8之前的版本)完成,但是如何使用Java Stream API实现相同的功能呢?

Map<String, Set<String>> reversed = original.entrySet().stream().map(x -> ????).collect(??)
Run Code Online (Sandbox Code Playgroud)

有张贴类似的问题在这里,但只适用于单值Map秒.

java java-8 java-stream

6
推荐指数
2
解决办法
83
查看次数

如何配置 Debezium Mysql 连接器来生成原始键而不是 struct 或 json 对象?

我正在使用 Debezium 来检测 MySql 源表中的更改。如何生成 Kafka 消息,使键为数字 ( Long) 值而不是 Json 对象?

我得到什么:

key: {"foo_id": 123} 
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}
Run Code Online (Sandbox Code Playgroud)

我想要的是:

key: 123
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}
Run Code Online (Sandbox Code Playgroud)

我的 FOO 表如下所示:

foo_id: INT
bar: VARCHAR 
baz: VARCHAR
Run Code Online (Sandbox Code Playgroud)

请注意,我没有使用 avro,并且我已经尝试了以下几种组合(带和不带密钥变压器),但未能获得密钥Long

"transforms": "unwrap,insertKey,extractKey",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false",
"transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields":"foo_id",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"foo_id",        
"key.converter" : "org.apache.kafka.connect.converters.LongConverter",
"key.converter.schemas.enable": "false", 
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
Run Code Online (Sandbox Code Playgroud)

我不确定 ValueToKey 或 ExtractField 适用于(MySQL)源,但我低于 NPE。

Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44) …
Run Code Online (Sandbox Code Playgroud)

mysql apache-kafka-connect debezium

5
推荐指数
1
解决办法
1571
查看次数