Jan*_*yne 5 java apache-kafka kafka-consumer-api kafka-producer-api
我目前正在使用Kafka 0.9.0.1.根据我发现的一些消息来源,设置消息大小的方法是修改以下键值server.properties.
我的server.properties文件实际上有这些设置.
message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760
Run Code Online (Sandbox Code Playgroud)
其他可能相关的设置如下.
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试发送大小为4到6 MB的有效负载的消息时,消费者永远不会收到任何消息.生产者似乎发送消息而没有任何异常被抛出.如果我发送较小的有效载荷(如<1 MB),那么消费者确实会收到消息.
关于我在配置设置方面做错了什么的任何想法?
以下是发送消息的示例代码.
Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
File f = new File(dir, s);
byte[] data = Files.readAllBytes(f.toPath());
Payload payload = new Payload(data); //a simple pojo to store payload
String key = String.valueOf(System.currentTimeMillis());
byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();
Run Code Online (Sandbox Code Playgroud)
以下是接收消息的示例代码.
KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
ConsumerRecord<String, byte[]> records = consumer.poll(100);
for(ConsumerRecord<String, byte[]> record : records) {
long offset = record.offset();
String key = record.key();
byte[] val = record.value();
Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
System.out.println(
System.format("offset=%d, key=%s", offset, key));
}
}
Run Code Online (Sandbox Code Playgroud)
以下是填充生产者和使用者的属性文件的方法.
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("compression.type", "snappy");
props.put("max.request.size", 10485760); //need this
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.partition.fetch.bytes", 10485760); //need this too
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return props;
}
Run Code Online (Sandbox Code Playgroud)
Jane,fetch.message.max.bytes首先不要使用,因为这是来自使用者的属性,不会进入server.properties文件,其次是因为旧版本的使用者,而是max.partition.fetch.bytes在创建Consumer作为部分时使用您用来实例化它的属性.
| 归档时间: |
|
| 查看次数: |
12269 次 |
| 最近记录: |