标签: kafka-producer-api

如果我不关闭 kafka 生产者会发生什么

我正在处理 xml,我需要为每条记录发送一条消息,当我收到最后一条记录时,我关闭了 kafka 生产者,这里的问题是 kafka 生产者的发送方法是异步的,因此,有时当我关闭生产者时它trowsjava.lang.IllegalStateException: Cannot send after the producer is closed.我读的地方,我可以离开了制片人开放。我的问题是:这意味着什么,或者是否有更好的解决方案。

- -编辑 - -

<list>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
...
</list>
Run Code Online (Sandbox Code Playgroud)

想象以下场景:

  • 我们读取标签并创建 kafka 生产者
  • 我们读取每个元素的属性,生成一个 json 对象并使用 send 方法将其发送到 kafka。- 当我们读取元素时,我们在生产者中调用 close 方法

问题是元素的数量可能是 80k,因此,有时当我们调用 disconnect 方法时,它会继续以异步方式发送消息。所以我们需要先调用flush方法,但是会影响性能

java apache-kafka kafka-producer-api

7
推荐指数
1
解决办法
1万
查看次数

在 Kafka 中设计消息键的最佳方法是什么?

我有一个分区主题,其中有X分区。

截至目前,在生成消息时,我ProducerRecord只创建了 Kafka 的指定topicvalue. 我没有定义key. 据我所知,我的消息将使用默认的内置分区器在分区之间均匀分布。另一方面,我有一个 Kafka 消费者线程池。每个 Kafka 消费者都将在其自己的专用线程中运行,以消费来自主题的消息。这些消费者中的每一个都被赋予相同的group.id. 这将允许并行使用消息。每个消费者都将被分配其公平份额的分区以供读取。

我希望我的消息以有序的方式被消费。我知道 Kafka 保证了分区内消息的顺序。所以,只要我想出一个合适的密钥结构,我就会对我的消息进行分区,使它们最终在同一个分区中。在某种程度上,消息键将消息分组并将它们存储在分区中。

是否有意义?

问:是否有可能由于设计不当的密钥而导致分区不均匀?一个人可能会收到比其他人更多的记录。它会以一种糟糕的方式影响我的 Kafka 集群的性能吗?消息密钥设计的最佳实践是什么?

multithreading multiprocessing apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
1
解决办法
8372
查看次数

当 kafka 服务器关闭时,Kafka 生产者无限期地发送块

我正在使用 Kafka 0.11.0.0。我有一个发布到 Kafka 主题的测试程序;如果 zookeeper 和 Kafka 服务器关闭(这在我的开发环境中是正常的;我会根据需要启动它们),那么对 KafkaProducer<>.send() 的调用将无限期挂起。

我要么需要让 send() 返回,最好是指出错误;或者我需要一种方法来检查服务器是启动还是关闭。基本上,我希望我的测试工具能够告诉我,“嘿,傻瓜,启动 Kafka!” 而不是挂。

我的生产者任务有没有办法确定服务器是启动还是关闭?

我像这样调用 send() :

kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
    message), (rm, ex) -> {
        System.out.println("**** " + rm + "\n**** " +ex);
});
Run Code Online (Sandbox Code Playgroud)

我有 linger.ms = 1; 我试过 retries=0、1 和 2,而 send() 仍然阻塞。我从未见过调用回调。

较旧的消息建议将 metadata.fetch.timeout.ms 设置为一个较小的值,但这在 0.11 中已消失。其他人建议调用命令行实用程序来查看服务器是否正常……但引用的实用程序似乎也不见了。

完成这项工作的优雅方式是什么?

apache-kafka kafka-producer-api

7
推荐指数
2
解决办法
5652
查看次数

获取Kafka压缩消息大小

我想知道 kafka 中消息的压缩大小。

我使用 kafka 1.1.0 和 java kafka-connect 1.1.0 将消息从我的生产者发送到主题。

如果消息对我的制作人来说太大,我会收到

该消息在序列化时为 xxx 字节,大于您使用 max.request.size 配置配置的最大请求大小。

将 max.request.size 设置为合适的值会导致来自代理的错误消息,因为 message.max.bytes 也必须在代理配置中进行相应调整。不幸的是,错误消息不包括代理收到的消息的大小。我调整了 message.max.bytes。到现在为止还挺好。

如果我在生产者端激活压缩,max.request.size 仍然必须与没有压缩的大小相同,因为不幸的是,代码在压缩之前比较了未压缩消息的大小(请参阅https://issues.apache .org/jira/browse/KAFKA-4169 )

但是通过压缩,我将能够减少代理中的 message.max.bytes。问题是我在任何时候都无法确定此压缩消息的大小。有没有办法在发送消息之前或稍后在日志文件中在生产者代码中弄清楚这一点?

在我使用压缩的情况下,message.max.bytes 的默认值 1MB 就足够了,所以我不必更改默认配置。但我想知道我的压缩消息是远低于 1MB 还是只有 0.99MB。在这种情况下,我可能会在生产中增加 message.max.bytes 以避免出现问题。

提前感谢您的支持。

java apache-kafka kafka-producer-api apache-kafka-connect

7
推荐指数
1
解决办法
3693
查看次数

在 producer.send 期间获取 ProducerFencedException 的原因是什么?

尝试将大约 50K 条消息加载到 KAFKA 主题中。在少数运行开始时低于异常但并非总是如此。

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state  
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]  
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]  
at  org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]  
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]  
...  
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.  
Run Code Online (Sandbox Code Playgroud)

代码块如下:

public void persistUpdatesPostAction(List<Message> messageList ) {
    if ((messageList == null) || (messageList.isEmpty())) {
        return;
    }
    logger.createDebug("Messages in batch(postAction) …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api

7
推荐指数
1
解决办法
8558
查看次数

处理 Kafka Producer 超时异常的指南?

由于我的 Kafka 制作人的各种原因,我经常收到超时异常。我目前正在使用生产者配置的所有默认值。

我见过以下超时异常:

org.apache.kafka.common.errors.TimeoutException:60000 毫秒后无法更新元数据。

org.apache.kafka.common.errors.TimeoutException:主题 1-​​0 的 1 条记录到期:自上次追加以来已经过去了 30001 毫秒

我有以下问题:

  1. 这些超时异常的一般原因是什么?

    1. 临时网络问题
    2. 服务器问题?如果是,那么什么样的服务器问题?
  2. 处理超时异常的一般准则是什么?

    1. 设置“重试”配置以便 Kafka API 进行重试?
    2. 增加 'request.timeout.ms' 或 'max.block.ms' ?
    3. 捕获异常并让应用程序层重试发送消息,但这对于异步发送似乎很难,因为消息将被乱序发送?
  3. 超时异常是可重试的异常吗?重试它们是否安全?

我正在使用 Kafka v2.1.0 和 Java 11。

提前致谢。

java timeoutexception apache-kafka kafka-producer-api

7
推荐指数
2
解决办法
8588
查看次数

如何确定Kafka的API版本?

我正在使用kafka-python访问 Kafka。我尝试创建一个 Kafka Producer:

kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9092'])
Run Code Online (Sandbox Code Playgroud)

但这会因kafka.errors.NoBrokersAvailable: NoBrokersAvailable异常而失败。

我发现我需要向api_versionKafkaProducer添加参数:

kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
                               api_version=(0, 10, 1))
Run Code Online (Sandbox Code Playgroud)

此命令有效。

问题是:如何确定 的值api_version

kafka-broker-api-versions.sh --bootstrap-server localhost:9092给了我一些东西,但我不确定是否有我可以使用的号码。我尝试了随机值api_version=(20, 2, 1),它也有效。

apache-kafka kafka-python kafka-producer-api

7
推荐指数
1
解决办法
6086
查看次数

可以与 Zookeeper 交谈,但不能与消息代理交谈

我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在 Type 的每条消息之后,A我立即收到一个 type B... 并最终收到一个type C

A [INFO]    2019-11-19T15:17:19.603Z    <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR]   2019-11-19T15:17:19.605Z    <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Run Code Online (Sandbox Code Playgroud)

是什么导致代理节点接受来自有希望的生产者的 TCP 连接,然后立即再次关闭它?

编辑

  • 该主题已存在,并kafka-topics.sh --list显示它。

  • 我用过的所有客户端都遇到同样的问题:Kafka's kafka-console-producer.shkafka-pythonconfluent-kafkakafkacat

  • Kafka 集群与我的所有其他机器在同一个 VPC 中,它的安全组允许该 VPC 内的任何传入和传出流量。

  • 但是,它由 Amazon 的 Managed Streaming …

python amazon-web-services apache-kafka kafka-producer-api amazon-msk

7
推荐指数
1
解决办法
1620
查看次数

org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题

我收到错误:

 org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.
Run Code Online (Sandbox Code Playgroud)

当尝试使用 Java 在 Windows 上的本地 kafka 实例中生成主题时。请注意,主题 testtopic2 存在,我可以使用 Windows 控制台生产者向它生成消息就好了。

在我正在使用的代码下方:

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Kafka_Producer {

    public static void main(String[] args){

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        TestCallback callback = new TestCallback();
        for (long i = 0; …
Run Code Online (Sandbox Code Playgroud)

java windows apache-kafka kafka-producer-api

7
推荐指数
6
解决办法
2万
查看次数

Kafka:Sarama、幂等性和 transactional.id

Shopify/sarama是否提供类似于transactional.idJVM API的选项?

该库支持幂等(Config.Producer.Idemponent,类似于enable.idempotence),但我不明白如何在没有transactional.id.

如果我错了,请纠正我,Sarama 中缺少有关这些选项的文档。但是根据 JVM 文档,没有标识符的幂等性将受到单个生产者会话的限制。换句话说,当生产者失败并重新启动时,我们将失去保证。

我在源代码和一些测试(例如)中找到了相关属性,但不明白如何在外部使用它们。

go apache-kafka kafka-producer-api sarama

7
推荐指数
1
解决办法
236
查看次数