我有以下几点:
KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");
Run Code Online (Sandbox Code Playgroud)
流 B 中的消息需要使用表 A 中的数据进行丰富。
示例数据:
Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})
Run Code Online (Sandbox Code Playgroud)
在一个完美的世界里,我想做
streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
.selectKey((k,b) -> b.name)
.to("C");
Run Code Online (Sandbox Code Playgroud)
不幸的是,这对我不起作用,因为我的数据是这样的,每次将消息写入主题 A 时,也会将相应的消息写入主题 B(源是单个 DB 事务)。现在,在此初始“创建”事务之后,主题 B 将继续接收更多消息。有时每秒会在主题 B 上显示多个事件,但对于给定的键,也可能有几个小时的连续事件。
简单的解决方案不起作用的原因是原始的“创建”事务会导致竞争条件:主题 A 和 B 几乎同时获取他们的消息,如果 B 消息首先到达拓扑的“加入”部分(比如几毫秒)在 A 消息到达那里之前)tableA 还不会包含相应的条目。此时事件丢失。我可以在主题 C 上看到这种情况:有些事件出现,有些没有(如果我使用 leftJoin,所有事件都会出现,但有些具有空键,相当于丢失)。这只是初始“创建”事务的问题。之后每次事件到达主题 B 时,表 A 中都存在相应的条目。
所以我的问题是:你如何解决这个问题?
我目前的解决方案是丑陋的。我所做的是创建了一个“B 集合”并使用以下内容阅读主题 B
B.groupByKey()
.aggregate(() -> new …Run Code Online (Sandbox Code Playgroud) 我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器。连接器产生以下模式:
key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string"}
]}
Run Code Online (Sandbox Code Playgroud)
实际上,有几个主题,key-schema 总是“int”,value-schema 总是某种记录(用户、产品等)。我的代码包含以下定义
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);
Run Code Online (Sandbox Code Playgroud)
起初我尝试使用类似的东西来消费这个主题,
Consumed.with(Serdes.Integer(), userSerde);但这不起作用,因为 Serdes.Integer() 期望使用 4 个字节对整数进行编码,但 avro 使用可变长度编码。使用Consumed.with(Serdes.Bytes(), userSerde);有效,但我真的想要 int 而不是字节,所以我将代码更改为此
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true);
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
Run Code Online (Sandbox Code Playgroud)
这使编译器产生警告(它不喜欢(Serde<Integer>)(Serde)强制转换)但它允许我使用
Consumed.with(keySerde, userSerde); …
java avro apache-kafka apache-kafka-streams confluent-platform