我正在学习 kafka,我想将我的应用程序拆分为 2 个微服务。首先将所有从 KafkaConsumer 传入的消息保存到数据库中,然后根据给定的 id 选择实体。其次提供 REST api 来保存和获取实体。它们之间的交互由 kafka 提供。如何使用kafka从REST api中的db接收存储的ID?这是调用 POST 请求的生产者的示例代码。
public void sendToKafka(MyObject myobject) throws ExecutionException, InterruptedException {
LOGGER.info("sending payload='{}' to topic='{}'", myobject, myTopic);
byte[] bytes = parseObjectToByte(myobject);
ListenableFuture<SendResult<String, byte[]>> resultFuture = kafkaTemplate.send(topicSave, bytes);
SendResult<String, byte[]> result = resultFuture.get();
LOGGER.info(result.toString());
}
Run Code Online (Sandbox Code Playgroud)
和消费者,将 myObject 保存到数据库
@KafkaListener(topics = "${kafka.topic.mytopic}")
public void saveMyObject(byte[] value) {
MyObject myobject = parseToMyObject(value);
LOGGER.info("received myobject='{}'", myobject);
MyObject myobjectSaved = myObjectRepository.insert(myobject);
}
Run Code Online (Sandbox Code Playgroud)
我正在使用 spring-kafka 和 spring-boot。Rest api 有两种方法:POST - 保存 myObject Get …