我是Kafka的新手,并试图为它设置环境.我试图运行单个节点Kafka但我收到错误.
在mac上执行以下步骤
1. brew install zookeeper
2. brew install kafka
3. zkServer start
4. kafka-server-start.sh /usr/local/etc/kafka/server.properties
5.bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
6.bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
Run Code Online (Sandbox Code Playgroud)
但我得到了以下错误.如果我遗漏了什么,请告诉我
[2015-10-19 15:48:46,632] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2015-10-19 15:48:46,637] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic …Run Code Online (Sandbox Code Playgroud) 以下两个依赖项之间有什么区别?我是否真的需要第一个制作消费者或制作人应用程序?
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
我的制作人只与第一个一起工作正常,但消费者需要第二个.
我原以为"kafka-clients"工件对生产者和消费者都有效.但看起来像"kafka.consumer.Consumer"来自其他依赖.为什么会有区别?
另外,为什么第一个神器命名为kafka_2.9.2?即为什么名称中的版本标识符?
我正在尝试使用auto.create.topics.enable = true在我的server.properties文件中连接到我的代理.但是当我尝试使用Java客户端生产者连接到代理时,我得到以下内容error.
1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - kafka生产者I/O线程中未捕获的错误:org.apache.kafka.common.protocol.types.SchemaException:读取字段'topic_metadata'时出错:在org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java)的org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)中读取大小为619631的数组,只有37个字节可用时出错:380)org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)位于org.apache.kafka.clients的org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269). producer.internals.Sender.run(Sender.java:229)atg.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)at java.lang.Thread.run(Unknown Source)
以下是我的客户端生产者代码.
public static void main(String[] argv){
Properties props = new Properties();
props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full",true);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
try{ for(int i = 0; i < 10; i++)
{ producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
System.out.println("Tried sending:"+i);}
}
catch (Exception e){
e.printStackTrace();
}
producer.close();
}
Run Code Online (Sandbox Code Playgroud)
有人可以帮我解决这个问题吗?
我有一个单节点,多(3)个代理Zookeeper/Kafka设置.我正在使用Kafka 0.10 Java客户端.
我写了下面的简单远程(在与Kafka不同的服务器上)生产者(在代码中我用MYIP替换了我的公共IP地址):
Properties config = new Properties();
try {
config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094");
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producer = new KafkaProducer<String, byte[]>(config);
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(GATEWAY_SCHEMA);
recordInjection = GenericAvroCodecs.toBinary(schema);
GenericData.Record avroRecord = new GenericData.Record(schema);
//Filling in avroRecord (code not here)
byte[] bytes = recordInjection.apply(avroRecord);
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes));
RecordMetadata data = future.get();
} catch (Exception e) {
e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)
我的3个代理的服务器属性如下所示(在3个不同的服务器属性文件中,broker.id为0,1,2,侦听器为PLAINTEXT://:9092,PLAINTEXT://:9093,PLAINTEXT://:9094和host.name是10.2.0.4,10.2.0.5,10.2.0.6).这是第一个服务器属性文件:
broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400 …Run Code Online (Sandbox Code Playgroud) 我是java,spring和kafka的新手
我有下一个发送消息的代码
kafkaTemplate.send(topic, message);
Run Code Online (Sandbox Code Playgroud)
我对生产者的配置:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
// value to block, after which it will throw a TimeoutException
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
Run Code Online (Sandbox Code Playgroud)
我想与我的消费者群体发送消息(例如"MyConsumerGroup"),
但我不知道如何做到这一点感谢帮助
卡夫卡高级制作人选出领导者的时间和频率是多少?是在发送每条消息之前还是在创建连接时只执行一次?
我们有 3 个 zk 节点集群和 7 个代理。现在我们必须创建一个主题并且必须为这个主题创建分区。
但是我没有找到任何公式来决定我应该为此主题创建多少分区。生产者的速率为 5k 条消息/秒,每条消息的大小为 130 字节。
提前致谢
收到以下错误(Kafka 2.1.0):
2018-12-03 21:22:37.873 错误 37645 --- [nio-8080-exec-1] osksupport.LoggingProducerListener :发送带有 key='null' 和 payload='{82, 73, 70 的消息时抛出异常, 70, 36, 96, 19, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 1, 0, 68, -84,.. .' to topic recieved_sound: org.apache.kafka.common.errors.RecordTooLargeException: 序列化时消息为 1269892 字节,大于您使用 max.request.size 配置配置的最大请求大小。
我尝试了各种 SO 帖子中的所有建议。
我的 Producer.properties:
max.request.size=41943040
message.max.bytes=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)
服务器.属性:
socket.request.max.bytes=104857600
message.max.bytes=41943040
max.request.size=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)
ProducerConfig(Spring Boot):
configProps.put("message.max.bytes", "41943040");
configProps.put("max.request.size", "41943040");
configProps.put("replica.fetch.max.bytes", "41943040");
configProps.put("fetch.message.max.bytes", "41943040");
Run Code Online (Sandbox Code Playgroud)
消费者配置(SpringBoot):
props.put("fetch.message.max.bytes", "41943040");
props.put("message.max.bytes", "41943040");
props.put("max.request.size", …Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我正在针对Python的confluent-kafka使用本地Java实现对Apache Kafka Producer进行测试,以查看哪个具有最大吞吐量。
我正在使用docker-compose部署一个包含3个Kafka代理和3个zookeeper实例的Kafka集群。我的docker撰写文件:https : //paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw
这是一个非常简单的代码,其中包含Python confluent-kafka的大多数默认选项,并且在Java生产者中进行了一些配置更改,以匹配confluent-kafka的配置。
Python代码:
from confluent_kafka import Producer
producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
'default.topic.config': {'acks': "all"}})
ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'
def f():
import time
start = time.time()
for i in xrange(1000000):
try:
producer.produce('test-topic', ss)
except Exception:
producer.poll(1)
try:
producer.produce('test-topic', ss)
except Exception:
producer.flush(30)
producer.produce('test-topic', ss)
producer.poll(0)
producer.flush(30)
print(time.time() - start)
if __name__ == '__main__':
f()
Run Code Online (Sandbox Code Playgroud)
Java实现。配置与librdkafka中的config相同。按照Edenhill的建议更改了linger.ms和回调。
package com.amit.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.nio.charset.Charset;
import java.util.Properties; …Run Code Online (Sandbox Code Playgroud) 我刚刚开始使用 Kafka 并遇到以下菜鸟错误:
'Value cannot be null.
Parameter name: Value serializer not specified and there is no default serializer defined for type ActMessage.'
Run Code Online (Sandbox Code Playgroud)
当尝试发送类对象、ActMessage 对象而不是示例附带的简单字符串时,会发生这种情况。引发错误的代码行是:
using (var p = new ProducerBuilder<Null, ActMessage>(config ).Build()
Run Code Online (Sandbox Code Playgroud)
我正在使用.net 客户端。
我的理解是,我需要在第一个类型参数中使用默认序列化之一,这是 Kafka 客户端附带的一个,如此处所述,但无法在此 .net 包中找到它们。我想我可以建造一个,但这会浪费时间。
这是一个可重现的示例:
public class ActMessage {
public int SomeId {get;set;}
public string SomeContent {get;set;}
}
class Tester {
void send(){
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using (var p = new ProducerBuilder<Null, ActMessage>(config).Build()) //throws …Run Code Online (Sandbox Code Playgroud)