我是卡夫卡的新手.我在我的本地机器上创建了一个java生成器,并在另一台机器上设置了一个Kafka代理,比如M2,在网络上(我可以ping,SSH,连接到这台机器).在Eclipse控制台的Producer端,我收到"Message sent".但是,当我检查机器M2上的控制台消费者时,我看不到这些消息.
我的java生产者代码是:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class KafkaMessageProducer {
/**
* @param args
*/
public static void main(String[] args) {
KafkaMessageProducer reportObj = new KafkaMessageProducer();
reportObj.send();
}
public void send(){
Map<String, Object> config = new HashMap<String, Object>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "135.113.133.60:9092");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
int maxMessages = 5;
int count = 0;
while(count < maxMessages){
producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++));
System.out.println("Message send.."+count);
}
producer.close();
}
}
Run Code Online (Sandbox Code Playgroud)
你能告诉我哪里出错了吗?我可以从控制台生产者在机器M2上本地发送消息.注意:即使我将IP地址更改为Kafka代理的完整主机名,它仍然存在同样的问题.
更新:我还认为Producer能够连接到Kafka代理并发送消息,但Kafka Broker不会将这些消息传递给消费者.如果我将IP地址或端口更改为Zookeeper(与Kafka Broker在同一节点上运行),并查看Zookeeper的日志,它将获取Producer ping,然后拒绝该会话.
Update2:我创建了一个Producer jar并在Machine M2上运行了这个jar,它运行起来了.因此,生产者试图连接到Kafka代理的方式似乎有问题.还不确定是什么问题.
我终于找到了答案,我在这里发布以防其他人遇到同样的问题。当您尝试远程连接时,请使用 Kafka 代理设置advertising.hostname。这对我有用。
| 归档时间: |
|
| 查看次数: |
3616 次 |
| 最近记录: |