Haf*_*sif 11 apache-kafka apache-spark
我在NodeMS中编写了一个Kafka Producer,在Java Maven中编写了Kafka Consumer.我的主题是"test",它是由以下命令创建的:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Run Code Online (Sandbox Code Playgroud)
NodeJS中的制片人:
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var producer = new Producer(client);
producer.on('ready', function () {
producer.send([
{ topic: 'test', partition: 0, messages: ["This is the zero message I am sending from Kafka to Spark"], attributes: 0},
{ topic: 'test', partition: 1, messages: ["This is the first message I am sending from Kafka to Spark"], attributes: 0},
{ topic: 'test', partition: 2, messages: ["This is the second message I am sending from Kafka to Spark"], attributes: 0}
], function (err, result) {
console.log(err || result);
process.exit();
});
});
Run Code Online (Sandbox Code Playgroud)
当我从NodeJS生成器发送两条消息时,它被Java Consumer成功使用.但是当我从NodeJS生产者发送三条或更多消息时,它会给我以下错误:
{[BrokerNotAvailableError:找不到领导者]消息:'找不到领导'}
我想问一下,如何将LEADER设置为主题"test"中的任何消息.或者该问题的解决方案应该是什么.
小智 14
而不是partition
使用partitions
(复数键名)。
例如:
producer.on('ready', function () {
producer.send([
{ topic: 'test', partitions: 0, messages: ["This is the zero message I am sending from Kafka to Spark"], attributes: 0},
{ topic: 'test', partitions: 1, messages: ["This is the first message I am sending from Kafka to Spark"], attributes: 0},
{ topic: 'test', partitions: 2, messages: ["This is the second message I am sending from Kafka to Spark"], attributes: 0}
], function (err, result) {
console.log(err || result);
process.exit();
});
});
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
9551 次 |
最近记录: |