如果消息处理失败,则再次使用相同的消息

hav*_*vij 15 c# apache-kafka kafka-consumer-api

我正在使用 Confluent.Kafka .NET 客户端版本 1.3.0。我正在关注文档

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是,即使我注释掉线consumer.StoreOffset(consumerResult);,我不断收到下一个未消费的消息,下一次我消费,即抵消不断增加这似乎并不为它做什么的文件要求,即至少一个传递

即使我EnableAutoCommit = false从配置中设置并删除 'EnableAutoOffsetStore = false',并替换consumer.StoreOffset(consumerResult)consumer.Commit(),我仍然看到相同的行为,即即使我注释掉Commit,我仍然继续收到下一个未使用的消息。

我觉得我在这里缺少一些基本的东西,但不知道是什么。任何帮助表示赞赏!

Tuy*_*ong 0

抱歉,我还不能添加评论。Kafka消费者批量消费消息,所以也许你仍然迭代后台线程预取的批次

您可以使用 kafka util 检查您的消费者是否真正提交了偏移量kafka-consumer-groups.sh

kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group consumer_group  --describe
Run Code Online (Sandbox Code Playgroud)