标签: kafka-consumer-api

fetch-min-size 和 max-poll-records sping kafka 配置无法按预期工作

我正在使用 spring kafka 开发一个 Spring boot 应用程序,该应用程序侦听 kafka 的单个主题,然后隔离各个类别的记录,从中创建一个 json 文件并将其上传到 AWS S3。

我在 Kafka 主题中收到大量数据,我需要确保 json 文件分块得足够大,以限制上传到 S3 的 json 数量。

以下是我application.yml对 kafka 消费者的配置。

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 
        seconds: 1 
      fetch-min-size: 500000000
      max-poll-records: 50000000
      value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer
Run Code Online (Sandbox Code Playgroud)

我创建了一个监听器来连续阅读该主题。

即使使用上述配置,我在控制台中收到的记录如下:

   2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 56. No Of measures: 60
   2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 80. No Of measures: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
2万
查看次数

Spring-Kafka 的替代方案(Kafka 库/实现)

我想要 Spring-Kafka 的替代方案的建议。

我在我的应用程序中使用了集成在 Spring 中的 Kafka。我想探索任何可用的替代库。如果比较分析能在图书馆之间共享就好了。

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
998
查看次数

Kafka:将主题偏移量保留在数据库中也是一种好的做法吗?

我已经开始学习kafka了。我对使用卡夫卡的现场项目不太了解。想知道除了在代理中提交之外,偏移量是否可以保存在数据库中?我认为应该始终保存它,否则某些记录将会丢失或重新处理。举个例子,如果偏移量没有保存在数据库中,当应用程序(消费者)在这段时间内部署或重新启动时,如果当时向代理发送了一些消息,则该消息将被错过,因为当消费者启动时,它将读取下一条后续记录或(从开始)

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
2376
查看次数

消费者群体在kafka中如何运作?

您好,我正在研究 kafka CLI,以清楚地了解 kafka 的工作原理。我对消费者群体感到困惑。我创建了具有三个分区的主题。我将创建生产者来向主题提供一些数据。我第一次添加了一些数据,如下所示。

kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user1
kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user2
kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user3
Run Code Online (Sandbox Code Playgroud)

现在我的理解是user1,user2,user3会随机去三个不同的分区。

创建消费者组时如下。

kafka-console-consumer --bootstrap-server localhost:9092 --topic users  --group user_group
Run Code Online (Sandbox Code Playgroud)

这将为我提供所有用户1、用户2、用户3。

现在,在一个消费者群体中,我可以拥有许多消费者。如果我在消费者组内有三个消费者,那么第一个消费者将从分区1读取,第二个消费者将从消费者2读取,然后第三个消费者将从消费者3读取。这是我到目前为止的理解。如果我的理解是正确的,那么演示上述行为的 cli 命令是什么?我知道上面提到的只有一个命令会返回所有数据?如果我的上述理解是正确的那么如果所有消费者都需要所有数据那么如何获取它?有人可以帮助我理解这个概念。任何帮助将不胜感激。谢谢

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
3097
查看次数

处理kafka消息需要很长时间

我有一个 Python 进程(或者更确切地说,在消费者组中并行运行的一组进程),它根据来自某个主题的 Kafka 消息输入来处理数据。通常每条消息都会很快得到处理,但有时,根据消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka代理会断开客户端与组的连接并启动重新平衡。我可以设置session_timeout_ms一个非常大的值,但大约需要 10 分钟以上,这意味着如果客户端挂掉,集群将在 10 分钟内无法正确重新平衡。这似乎是一个坏主意。此外,大多数消息(大约 98%)都很快,因此为 1-2% 的消息支付这样的惩罚似乎很浪费。OTOH,大消息足够频繁,足以导致大量重新平衡并消耗大量性能(因为当组重新平衡时,什么也没有完成,然后“死”客户端再次重新加入并导致另一次重新平衡)。

那么,我想知道是否还有其他方法来处理需要很长时间才能处理的消息?有没有办法手动启动心跳来告诉代理“没关系,我还活着,我只是在处理消息”?我认为 Python 客户端(我使用的kafka-python 1.4.7)应该为我做这件事,但它似乎没有发生。此外,该 API 似乎根本没有单独的“心跳”功能。据我了解,调用poll()实际上会给我下一条消息——而我什至还没有完成当前的消息,并且还会弄乱 Kafka 消费者的迭代器 API,这在 Python 中使用起来相当方便。

如果很重要的话,如果我没记错的话,Kafka 集群是 Confluence,版本 2.3。

apache-kafka kafka-consumer-api kafka-python

2
推荐指数
1
解决办法
9135
查看次数

client.dns.lookup 选项中的“use_all_dns_ips”和“resolve_canonical_bootstrap_servers_only”之间的确切区别是什么?

在kafka-client 2.1.0中,client.dns.lookup可用。以下是每个选项的说明。

  1. use_all_dns_ips

    当查找返回主机名的多个 IP 地址时,在连接失败之前将尝试连接所有这些 IP 地址

  2. 仅限resolve_canonical_bootstrap_servers_only

    每个条目都将被解析并扩展为规范名称列表

他们不是都使用dns吗?use_all_dns_ipsresolve_canonical_bootstrap_servers_only之间有什么区别?

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

2
推荐指数
1
解决办法
4911
查看次数

Kafka 获取最大字节数未按预期工作

I\xc2\xa0 有一个主题,价值 1 GB 消息。A. Kafka 消费者决定消费这些消息。我该怎么做才能禁止消费者一次消费所有消息?我尝试设置

\n\n

fetch.max.bytes在经纪人上

\n\n

更改为 30 MB,以便每次轮询中仅允许 30 MB 的消息。代理似乎不尊重这一点,并尝试立即向消费者提供所有消息,导致消费者内存不足错误。我该如何解决这个问题?

\n

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
8270
查看次数

Python Kafka消费者读取已读消息

卡夫卡消费者代码 -

def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, "data")
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    consumer_timeout_ms=1000,
    group_id="Group2",
    enable_auto_commit=False,
    auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
consumer.seek_to_beginning(topic_partition)
for message in consumer:
    print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()
Run Code Online (Sandbox Code Playgroud)

预期行为 它应该只读取生产者写入的最后一条消息。它应该只打印:

file_data key=None value=b'data'
Run Code Online (Sandbox Code Playgroud)

当前行为 运行代码后打印:

file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
7950
查看次数

Kafka 消费者 - 在反序列化之前读取消息头

有没有办法在反序列化之前读取消息头?

我写了下面的代码,但我被迫在这里反序列化,有什么办法不反序列化吗?

    while (true) {

        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            for (Header header : consumerRecord.headers()) {
                if (header.key().equals("my header")) {
                    String data = "\n New record received .. \n" +
                            " Value: " + consumerRecord.value() +
                            " Topic: " + consumerRecord.topic() +
                            " Header: " + header.key() +
                            " Partition: " + consumerRecord.partition();

                    logger.info(data);
                }
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
8074
查看次数

从kafka offset中只获取一条记录

我知道可以通过使用 kafka-console-consumer 中的 --offset 从特定偏移量的 kafka 读取数据,但这给出了从该特定偏移量到当前偏移量的记录。我怎样才能将其限制为只有一条记录。

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
1175
查看次数