标签: kafka-producer-api

将消息发布到Kafka主题时出错

我是Kafka的新手,并试图为它设置环境.我试图运行单个节点Kafka但我收到错误.

在mac上执行以下步骤

1. brew install zookeeper
2. brew install kafka
3. zkServer start
4.  kafka-server-start.sh /usr/local/etc/kafka/server.properties
5.bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
6.bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
Run Code Online (Sandbox Code Playgroud)

但我得到了以下错误.如果我遗漏了什么,请告诉我

[2015-10-19 15:48:46,632] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-10-19 15:48:46,637] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api apache-zookeeper

8
推荐指数
2
解决办法
2万
查看次数

Kafka Maven依赖

以下两个依赖项之间有什么区别?我是否真的需要第一个制作消费者或制作人应用程序?

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.9.2</artifactId>
        <version>0.8.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
    </dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)

我的制作人只与第一个一起工作正常,但消费者需要第二个.

我原以为"kafka-clients"工件对生产者和消费者都有效.但看起来像"kafka.consumer.Consumer"来自其他依赖.为什么会有区别?

另外,为什么第一个神器命名为kafka_2.9.2?即为什么名称中的版本标识符?

maven apache-kafka kafka-consumer-api kafka-producer-api

8
推荐指数
1
解决办法
1万
查看次数

在Kafka中读取字段'topic_metadata'时出错

我正在尝试使用auto.create.topics.enable = true在我的server.properties文件中连接到我的代理.但是当我尝试使用Java客户端生产者连接到代理时,我得到以下内容error.

1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - kafka生产者I/O线程中未捕获的错误:org.apache.kafka.common.protocol.types.SchemaException:读取字段'topic_metadata'时出错:在org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java)的org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)中读取大小为619631的数组,只有37个字节可用时出错:380)org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)位于org.apache.kafka.clients的org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269). producer.internals.Sender.run(Sender.java:229)atg.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)at java.lang.Thread.run(Unknown Source)

以下是我的客户端生产者代码.

public static void main(String[] argv){
         Properties props = new Properties();
         props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 0);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("block.on.buffer.full",true);
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try{ for(int i = 0; i < 10; i++)
        { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
             System.out.println("Tried sending:"+i);}
        }
        catch (Exception e){
            e.printStackTrace();
        }
         producer.close();
}
Run Code Online (Sandbox Code Playgroud)

有人可以帮我解决这个问题吗?

apache-kafka kafka-producer-api

8
推荐指数
1
解决办法
1万
查看次数

Kafka 0.10 Java客户端TimeoutException:包含1条记录的批处理已过期

我有一个单节点,多(3)个代理Zookeeper/Kafka设置.我正在使用Kafka 0.10 Java客户端.

我写了下面的简单远程(在与Kafka不同的服务器上)生产者(在代码中我用MYIP替换了我的公共IP地址):

Properties config = new Properties();
try {
    config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094");
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    producer = new KafkaProducer<String, byte[]>(config);
    Schema.Parser parser = new Schema.Parser();
    schema = parser.parse(GATEWAY_SCHEMA);
    recordInjection = GenericAvroCodecs.toBinary(schema);
    GenericData.Record avroRecord = new GenericData.Record(schema);
    //Filling in avroRecord (code not here)
    byte[] bytes = recordInjection.apply(avroRecord);

    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes));
    RecordMetadata data = future.get();
} catch (Exception e) {
    e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)

我的3个代理的服务器属性如下所示(在3个不同的服务器属性文件中,broker.id为0,1,2,侦听器为PLAINTEXT://:9092,PLAINTEXT://:9093,PLAINTEXT://:9094和host.name是10.2.0.4,10.2.0.5,10.2.0.6).这是第一个服务器属性文件:

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400 …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api

8
推荐指数
1
解决办法
9486
查看次数

如何在java中添加消费者组到消息?

我是java,spring和kafka的新手
我有下一个发送消息的代码

kafkaTemplate.send(topic, message);
Run Code Online (Sandbox Code Playgroud)

我对生产者的配置:

 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
    // value to block, after which it will throw a TimeoutException
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
Run Code Online (Sandbox Code Playgroud)

我想与我的消费者群体发送消息(例如"MyConsumerGroup"),
但我不知道如何做到这一点感谢帮助

java spring kafka-producer-api

8
推荐指数
1
解决办法
174
查看次数

卡夫卡领导人选举何时发生?

卡夫卡高级制作人选出领导者的时间和频率是多少?是在发送每条消息之前还是在创建连接时只执行一次?

apache-kafka kafka-producer-api apache-kafka-connect

8
推荐指数
1
解决办法
6516
查看次数

如何为kafka主题选择分区数?

我们有 3 个 zk 节点集群和 7 个代理。现在我们必须创建一个主题并且必须为这个主题创建分区。

但是我没有找到任何公式来决定我应该为此主题创建多少分区。生产者的速率为 5k 条消息/秒,每条消息的大小为 130 字节。

提前致谢

apache-kafka kafka-consumer-api kafka-producer-api

8
推荐指数
2
解决办法
1万
查看次数

Kafka:序列化时的消息大于你用max.request.size配置配置的最大请求大小

收到以下错误(Kafka 2.1.0):

2018-12-03 21:22:37.873 错误 37645 --- [nio-8080-exec-1] osksupport.LoggingProducerListener :发送带有 key='null' 和 payload='{82, 73, 70 的消息时抛出异常, 70, 36, 96, 19, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 1, 0, 68, -84,.. .' to topic recieved_sound: org.apache.kafka.common.errors.RecordTooLargeException: 序列化时消息为 1269892 字节,大于您使用 max.request.size 配置配置的最大请求大小。

我尝试了各种 SO 帖子中的所有建议。

我的 Producer.properties:

max.request.size=41943040
message.max.bytes=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)

服务器.属性:

socket.request.max.bytes=104857600
message.max.bytes=41943040
max.request.size=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)

ProducerConfig(Spring Boot):

configProps.put("message.max.bytes", "41943040");
configProps.put("max.request.size", "41943040");
configProps.put("replica.fetch.max.bytes", "41943040");
configProps.put("fetch.message.max.bytes", "41943040");
Run Code Online (Sandbox Code Playgroud)

消费者配置(SpringBoot):

props.put("fetch.message.max.bytes", "41943040");
props.put("message.max.bytes", "41943040");
props.put("max.request.size", …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

8
推荐指数
1
解决办法
2万
查看次数

Python librdkafka生产者针对本机Apache Kafka生产者执行

我正在针对Python的confluent-kafka使用本地Java实现对Apache Kafka Producer进行测试,以查看哪个具有最大吞吐量。

我正在使用docker-compose部署一个包含3个Kafka代理和3个zookeeper实例的Kafka集群。我的docker撰写文件:https : //paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw

这是一个非常简单的代码,其中包含Python confluent-kafka的大多数默认选项,并且在Java生产者中进行了一些配置更改,以匹配confluent-kafka的配置。

Python代码:

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
    'default.topic.config': {'acks': "all"}})

ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'

def f():
    import time
    start = time.time()
    for i in xrange(1000000):
        try:
            producer.produce('test-topic', ss)
        except Exception:
            producer.poll(1)
            try:
                producer.produce('test-topic', ss)
            except Exception:
                producer.flush(30)
                producer.produce('test-topic', ss)
        producer.poll(0)
    producer.flush(30)
    print(time.time() - start)


if __name__ == '__main__':
    f()
Run Code Online (Sandbox Code Playgroud)

Java实现。配置与librdkafka中的config相同。按照Edenhill的建议更改了linger.ms和回调。

package com.amit.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.Charset;
import java.util.Properties; …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api confluent-kafka librdkafka

8
推荐指数
1
解决办法
455
查看次数

Kafka 生产者错误:“未指定值序列化器,并且没有为类型定义默认序列化器...”

我刚刚开始使用 Kafka 并遇到以下菜鸟错误:

'Value cannot be null.
Parameter name: Value serializer not specified and there is no default serializer defined for type ActMessage.'
Run Code Online (Sandbox Code Playgroud)

当尝试发送类对象、ActMessage 对象而不是示例附带的简单字符串时,会发生这种情况。引发错误的代码行是:

using (var p = new ProducerBuilder<Null, ActMessage>(config ).Build()
Run Code Online (Sandbox Code Playgroud)

我正在使用.net 客户端。

我的理解是,我需要在第一个类型参数中使用默认序列化之一,这是 Kafka 客户端附带的一个,如此处所述但无法在此 .net 包中找到它们。我想我可以建造一个,但这会浪费时间。

这是一个可重现的示例:

public class ActMessage  {
    public int SomeId {get;set;}
    public string SomeContent {get;set;}
}

class Tester  {

void send(){

    var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

        using (var p = new ProducerBuilder<Null, ActMessage>(config).Build()) //throws …
Run Code Online (Sandbox Code Playgroud)

c# apache-kafka kafka-producer-api confluent-kafka-dotnet

7
推荐指数
1
解决办法
6551
查看次数