当使用 Integer 作为键时,这不是问题,kafka 应该能够将 Strings 作为键处理。
ProducerFactory<String, String> pf =
new DefaultKafkaProducerFactory<String, String>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
ProducerRecord<String,String> pr = new ProducerRecord<>("my-topic", "key1","test");
template.send(pr);`
Run Code Online (Sandbox Code Playgroud)
它抛出以下异常:
Org.apache.kafka.common.errors.SerializationException:无法将类 java.lang.String 的键转换为 key.serializer 中指定的类 org.apache.kafka.common.serialization.IntegerSerializer
引起原因:java.lang.ClassCastException:java.lang.String无法在org.apache.kafka.common.serialization.IntegerSerializer.serialize(IntegerSerializer.java:21)在org.apache.kafka处转换为java.lang.Integer .common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) 在 org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) 在 org.apache.kafka.clients. Producer .KafkaProducer.doSend(KafkaProducer.java:799) 在 org.apache.kafka.clients. Producer.KafkaProducer.send(KafkaProducer.java:784) 在 org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java :285)在org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:357)在org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:206)
我有两个主题,一个带有3个分区,一个带有48个分区。
最初,我使用默认分配器,但是当使用者(kubernetes中的Pod)崩溃时遇到了一些问题。
发生的事情是,当吊舱再次出现时,它重新分配了带有3个分区的主题分区,分配了带有48个分区的主题分区0。
未崩溃的两个Pod从该主题分配了16个分区和32个分区,其中包含48个分区。
我已经通过使用循环分区分配器解决了这个问题,但是由于我使用的是kstream-kstream联接,因此现在我对分区的分布方式没有信心,为此,我们需要确保将使用者分配给相同的对象为所有使用者划分分区,例如C1:(t1:p0,t2:p0)C2(t1:p1,t2:p1)等。
我想到的一件事是,我可以重新输入即将发生的事件,以便它们可以重新分区,然后我可以保证这一点?
也许我不明白默认分区的工作方式。