我遇到了一个我不明白的 apache Kafka 问题。我在我的经纪人中订阅了一个名为 "topic-received" 的主题。这是代码:
protected String readResponse(final String idMessage) {
if (props != null) {
kafkaClient = new KafkaConsumer<>(props);
logger.debug("Subscribed to topic-received");
kafkaClient.subscribe(Arrays.asList("topic-received"));
logger.debug("Waiting for reading : topic-received");
ConsumerRecords<String, String> records =
kafkaClient.poll(kafkaConfig.getRead_timeout());
if (records != null) {
for (ConsumerRecord<String, String> record : records) {
logger.debug("Resultado devuelto : "+record.value());
return record.value();
}
}
}
return null;
}
Run Code Online (Sandbox Code Playgroud)
发生这种情况时,我从另一个点向“主题接收”发送消息。代码如下:
private void sendMessageToKafkaBroker(String idTopic, String value) {
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(mapProperties());
ProducerRecord<String, …Run Code Online (Sandbox Code Playgroud)