und*_*ned 0 python node.js apache-kafka kafka-consumer-api apache-kafka-connect
我按照此处提到的教程重新设置了我的 kafka 制作人:https : //www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04
我正在使用 cron 和以下带有 IP 的服务器上的脚本向生产者推送一些事件:1.2.3.4
#!/usr/bin/env python
import threading, logging, time
import multiprocessing
import requests
import datetime
import json
from kafka import KafkaProducer
class CheckApis():
apisList = {"a": "https://test.eng.com/"}
kafkaProducer = "1.2.3.4:9092"
kafkaTopic = "sometopic"
producer = KafkaProducer(bootstrap_servers=kafkaProducer)
for key, value in apisList.items():
headers = {};
response = requests.request("GET", value, headers=headers)
message = {"app_name": key, "status": response.status_code, "message": "none", "timestamp": str(datetime.datetime.utcnow())}
producer.send(kafkaTopic, json.dumps(message).encode('utf-8'));
print (response.text)
print (response.status_code)
producer.close()
Run Code Online (Sandbox Code Playgroud)
这很有效,我可以使用以下命令查看推送的事件:
~/kafka/bin/kafka-console-consumer.sh --zookeeper 1.2.3.4:2181 --topic sometopic --from-beginning
Run Code Online (Sandbox Code Playgroud)
但是当我尝试从其他服务器(我的笔记本电脑)远程使用这些事件时,它失败并出现错误:
错误:在侦听 kafka 事件时发生一些错误错误:连接 ECONNREFUSED 5.6.7.8:9092(这里有一些不同的 IP 不是 1.2.3.4)
这是我的消费者代码(在节点 js 中使用 kafka-node):
var ConsumerGroup = require('kafka-node').ConsumerGroup;
var healthConsumerOption = {
host: '1.2.3.4:2181',
autoCommit: true,
groupId: os.hostname(),
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
};
var healthConsumerGroup = new ConsumerGroup(healthConsumerOption, healthTopics);
listenHealthEventsKafka: function(connections){
try{
healthConsumerGroup.on('error', onError);
healthConsumerGroup.on('message', onMessage);
healthConsumerGroup.on('connect', function(){
logger.info("Health consumer group is ready. ")
});
function onMessage(message){
var jsonData = JSON.parse(message.value);
console.log(message);
};
function onError(error){
logger.error("Some error occured while listening to kafka events " +error);
}
process.once('SIGNINT', function(){
async.each([healthConsumerGroup], function(consumer, callback){
logger.info("Closing the kafka health consumer process ");
consumer.close(true, callback);
});
})
}catch(error){
logger.error("Could not connect to kafka events for build " +error);
}
}
Run Code Online (Sandbox Code Playgroud)
我是否需要在 Kafka 服务器(server.properties)上进行额外的配置以允许远程访问......或者我做错了什么?请帮忙。
在您的 Kafka 代理上,server.properties
您需要设置advertised.listeners
为外部 IP,以便客户端可以正确连接到它。否则他们将尝试连接到内部 IP(因为除非明确设置advertised.listeners
,listeners
否则将默认为)
参考:https : //kafka.apache.org/documentation/#brokerconfigs
另见https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
归档时间: |
|
查看次数: |
1815 次 |
最近记录: |