标签: kafka-consumer-api

需要澄清Kafka自动提交和auto.commit.interval.ms

文档https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html表示"请注意,启用自动提交后,对poll的调用将始终提交由以前的轮询.它不知道实际处理了哪些事件,所以在再次调用poll之前总是处理poll返回的所有事件(或者在调用close()之前,它还将自动提交偏移量).如果是这样的话,如果auto.commit.interval.ms大于处理从前一个接收的消息的时间,它是如何工作的poll().

为了使其更具体,请考虑我有以下情况:

enable.auto.commit=true

auto.commit.interval.ms=10
Run Code Online (Sandbox Code Playgroud)

我打电话给poll()一个循环.

1)在第一次调用时poll(),我得到1000条消息(偏移2000-3000),处理所有1000条消息需要1毫秒

2)我poll()再次打电话.在第二次poll()调用中,它应该提交从前一次返回的最新偏移量3000,poll()但是由于auto.commit.interval.ms设置为10 ms,它不会提交偏移量,对吧?

在这种情况下,提交的偏移量将进一步落后于实际处理的最新偏移量?

有人可以澄清/确认吗?

kafka-consumer-api

15
推荐指数
1
解决办法
1万
查看次数

Kafka在消费者组协调员和消费者组织领导者之间有什么区别?

我看到了对卡夫卡消费者组织协调员和消费者组织领导者的提及......

  1. 有什么不同?

  2. 将集团管理分为两组不同的责任有什么好处?

apache-kafka kafka-consumer-api

15
推荐指数
1
解决办法
7716
查看次数

Kafka制作人TimeoutException:过期1条记录

我正在使用Kafka和Spring-boot:

卡夫卡制片人班:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        }); …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

15
推荐指数
1
解决办法
2万
查看次数

如果消息处理失败,则再次使用相同的消息

我正在使用 Confluent.Kafka .NET 客户端版本 1.3.0。我正在关注文档

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult); …
Run Code Online (Sandbox Code Playgroud)

c# apache-kafka kafka-consumer-api

15
推荐指数
1
解决办法
1116
查看次数

如何保存Spark消耗给ZK或Kafka的最新偏移量,并在重启后可以回读

Kafka 0.8.2用来从AdExchange接收数据然后我Spark Streaming 1.4.1用来存储数据MongoDB.

我的问题是当我重新启动我的Spark StreamingJob时,例如更新新版本,修复bug,添加新功能.它将继续阅读最新offsetkafka重启作业期间在当时那么我将数据丢失的AdX推卡夫卡.

我尝试类似的东西,auto.offset.reset -> smallest但它会从0 - >收到最后数据是巨大的,并在数据库中重复.

我也尝试设置具体的group.idconsumer.idSpark却是相同的.

如何保存最新的offset消耗,火花zookeeperkafka然后可以从读回最新的offset

apache-kafka apache-spark spark-streaming kafka-consumer-api

14
推荐指数
1
解决办法
1万
查看次数

Kafka - 日志结束偏移(LEO)与高水位(HW)之间的差异

LEO and HWReplica(Leader Replica)有什么区别?

它们会包含相同的数字吗?我能理解HW就是last committed message offset.

当LEO将更新时如何?

apache-kafka kafka-consumer-api kafka-producer-api

14
推荐指数
2
解决办法
8820
查看次数

使用Spark Streaming时限制Kafka批量大小

是否可以限制Kafka消费者为Spark Streaming返回的批次大小?

我问,因为我得到的第一批有数亿条记录,处理和检查它们需要很长时间.

apache-kafka apache-spark spark-streaming kafka-consumer-api

14
推荐指数
2
解决办法
1万
查看次数

了解消费者群体ID

我做了Apache Kafka 0.10.1.0的全新安装.

我能够在命令提示符下发送/接收消息.

使用Producer/Consumer Java示例时,我无法知道Consumer Example上的group.id参数.

让我知道如何解决这个问题.

以下是我用过的消费者示例:

public static void main(String[] args) {
             Properties props = new Properties();
             props.put("bootstrap.servers", "localhost:9092");
             props.put("group.id", "my-topic");
             props.put("enable.auto.commit", "true");
             props.put("auto.commit.interval.ms", "1000");
             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
             try {
                 consumer.subscribe(Arrays.asList("my-topic"));

                     ConsumerRecords<String, String> records = consumer.poll(100);
                     System.err.println("records size=>"+records.count());
                     for (ConsumerRecord<String, String> record : records) 
                         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());



              }
             catch (Exception ex){
                 ex.printStackTrace();
             }
            finally {
                 consumer.close();
            }
        }
Run Code Online (Sandbox Code Playgroud)

在为消费者运行命令之后,我可以看到生产者发布的消息(在控制台上).但无法从java程序中看到消息 …

java apache-kafka kafka-consumer-api

14
推荐指数
2
解决办法
3万
查看次数

Kafka Consumer:找不到连接条目

我正在尝试通过使用远程Kafka群集上某个主题的数据来检查kafka使用者。使用时出现以下错误kafka-console-consumer.sh

 ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: No entry found for connection 2147475658
    at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
    at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:655)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:635)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:436)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
Run Code Online (Sandbox Code Playgroud)

这是我使用的命令:

./bin/kafka-console-consumer.sh --bootstrap-server SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT} --consumer.config ./config/consumer.properties …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

14
推荐指数
2
解决办法
1万
查看次数

如何从特定主题中删除 Kafka 消费者组?

我更改了侦听 Kafka 主题的 Web 服务的消费者组 ID。现在,旧的组ID仍然注册到主题,但是没有具有该组ID的消费者。因此,它是滞后的。如何从特定主题中删除特定消费者组?

我试过这个:

kafka-consumer-groups --bootstrap-server kafka01.myserver.com:9092 --topic notification-topic --delete --group old-consumer-group --execute

但它返回:“消费者不支持从消费者组删除特定于主题的偏移量。

我应该完全删除消费者组吗?我使用相同的组ID收听其他主题,它们会受到影响吗?

consumer apache-kafka kafka-consumer-api

14
推荐指数
1
解决办法
2万
查看次数