标签: kafka-producer-api

在Windows中启动Confluent Schema Registry

我有windows环境和我自己的一套kafka和zookeeper运行.为了使用自定义对象,我开始使用Avro.但我需要启动注册表.下载了Confluent平台并运行了这个:

$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
/c/Confluent/confluent-3.0.0-2.11/confluent-3.0.0/bin/schema-registry-run-class: line 103: C:\Program: No such file or directory
Run Code Online (Sandbox Code Playgroud)

然后我在安装页面上看到了这个:

"Confluent目前不支持Windows.Windows用户可以下载并使用zip和tar档案,但必须直接运行jar文件,而不是使用bin /目录中的包装脚本."

我想知道如何在Windows环境中启动融合模式注册表?

查看脚本的内容,很难解读.

谢谢

avro apache-kafka confluence-rest-api kafka-producer-api confluent

9
推荐指数
2
解决办法
5951
查看次数

设计 Kafka 主题 - 许多主题与一个大主题

考虑一系列不同的事件,推荐的方法是

  • 一个包含所有事件的大主题
  • 不同类型事件的多个主题

哪个选项会更好?

我知道消息不在主题的同一分区中意味着没有顺序保证,但是在做出此决定时还需要考虑其他因素吗?

messaging distributed-computing message-queue apache-kafka kafka-producer-api

9
推荐指数
1
解决办法
8871
查看次数

如何使用Python-kafka管理客户端在Kafka中创建主题?

是否有任何Python kafka管理客户端可用于从python程序创建主题/删除主题?我发现了一些 python api,但它们都没有可用的 Admin api?

Confluence 有 python admin api 吗?

apache-kafka kafka-consumer-api kafka-producer-api confluent-platform

9
推荐指数
2
解决办法
1万
查看次数

在 Kafka 生产者上启用幂等性是否会降低吞吐量

我在休息端点调用中启用了幂等性的kafka 生产者(没有启用一次语义或事务)。我启用它的原因是因为我不希望卡夫卡重试导致任何重复。我担心以下几点:

  • 幂等性会减慢我的端点速度吗?(这个端点需要非常快)
  • 我读了 kafka api 文档,启用幂等性将使重试无限(什么?)
  • 如果我不将幂等性用于事务,我真的需要幂等性吗?

apache-kafka kafka-producer-api

9
推荐指数
1
解决办法
2万
查看次数

将消息发布到Kafka主题时出错

我是Kafka的新手,并试图为它设置环境.我试图运行单个节点Kafka但我收到错误.

在mac上执行以下步骤

1. brew install zookeeper
2. brew install kafka
3. zkServer start
4.  kafka-server-start.sh /usr/local/etc/kafka/server.properties
5.bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
6.bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
Run Code Online (Sandbox Code Playgroud)

但我得到了以下错误.如果我遗漏了什么,请告诉我

[2015-10-19 15:48:46,632] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-10-19 15:48:46,637] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api apache-zookeeper

8
推荐指数
2
解决办法
2万
查看次数

我可以忽略org.apache.kafka.common.errors.NotLeaderForPartitionExceptions吗?

我的Apache Kafka制作人(0.9.0.1)间歇地抛出一个

org.apache.kafka.common.errors.NotLeaderForPartitionException
Run Code Online (Sandbox Code Playgroud)

我执行Kafka发送的代码类似于此

final Future<RecordMetadata> futureRecordMetadata = KAFKA_PRODUCER.send(new ProducerRecord<String, String>(kafkaTopic, UUID.randomUUID().toString(), jsonMessage));

try {
    futureRecordMetadata.get();
} catch (final InterruptedException interruptedException) {
    interruptedException.printStackTrace();
    throw new RuntimeException("sendKafkaMessage(): Failed due to InterruptedException(): " + sourceTableName + " " + interruptedException.getMessage());
} catch (final ExecutionException executionException) {
    executionException.printStackTrace();
    throw new RuntimeException("sendKafkaMessage(): Failed due to ExecutionException(): " + sourceTableName + " " + executionException.getMessage());
}
Run Code Online (Sandbox Code Playgroud)

我赶上NotLeaderForPartitionExceptioncatch (final ExecutionException executionException) {}街区.

可以忽略这个特殊的例外吗?

我的Kafka邮件是否已成功发送?

exception-handling apache-kafka kafka-producer-api

8
推荐指数
1
解决办法
5905
查看次数

如何在java中添加消费者组到消息?

我是java,spring和kafka的新手
我有下一个发送消息的代码

kafkaTemplate.send(topic, message);
Run Code Online (Sandbox Code Playgroud)

我对生产者的配置:

 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
    // value to block, after which it will throw a TimeoutException
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
Run Code Online (Sandbox Code Playgroud)

我想与我的消费者群体发送消息(例如"MyConsumerGroup"),
但我不知道如何做到这一点感谢帮助

java spring kafka-producer-api

8
推荐指数
1
解决办法
174
查看次数

卡夫卡领导人选举何时发生?

卡夫卡高级制作人选出领导者的时间和频率是多少?是在发送每条消息之前还是在创建连接时只执行一次?

apache-kafka kafka-producer-api apache-kafka-connect

8
推荐指数
1
解决办法
6516
查看次数

Kafka制作人可以创建主题和分区吗?

目前我正在评估不同的消息系统.有一个与Apache Kafka有关的问题,我无法回答.

Kafka制作人是否可以动态创建主题和分区(在现有主题上)?如果是的话,它有什么不利之处吗?

提前致谢

messaging apache-kafka kafka-producer-api

8
推荐指数
2
解决办法
1万
查看次数

Kafka:序列化时的消息大于你用max.request.size配置配置的最大请求大小

收到以下错误(Kafka 2.1.0):

2018-12-03 21:22:37.873 错误 37645 --- [nio-8080-exec-1] osksupport.LoggingProducerListener :发送带有 key='null' 和 payload='{82, 73, 70 的消息时抛出异常, 70, 36, 96, 19, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 1, 0, 68, -84,.. .' to topic recieved_sound: org.apache.kafka.common.errors.RecordTooLargeException: 序列化时消息为 1269892 字节,大于您使用 max.request.size 配置配置的最大请求大小。

我尝试了各种 SO 帖子中的所有建议。

我的 Producer.properties:

max.request.size=41943040
message.max.bytes=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)

服务器.属性:

socket.request.max.bytes=104857600
message.max.bytes=41943040
max.request.size=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)

ProducerConfig(Spring Boot):

configProps.put("message.max.bytes", "41943040");
configProps.put("max.request.size", "41943040");
configProps.put("replica.fetch.max.bytes", "41943040");
configProps.put("fetch.message.max.bytes", "41943040");
Run Code Online (Sandbox Code Playgroud)

消费者配置(SpringBoot):

props.put("fetch.message.max.bytes", "41943040");
props.put("message.max.bytes", "41943040");
props.put("max.request.size", …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

8
推荐指数
1
解决办法
2万
查看次数