小编DVS*_*DVS的帖子

Kafka消费者如何从多个分配的分区中消费

TL;博士; 我试图了解分配了多个分区的单个使用者如何处理消耗分区的记录.

例如:

  • 在移动到下一个分区之前完全处理单个分区.
  • 每次处理每个分区的一大块可用记录.
  • 从第一个可用分区处理一批N条记录
  • 在循环轮换中处理来自分区的一批N条记录

我找到了partition.assignment.strategy配置Ranged或配置器,RoundRobin但这只能确定如何为消费者分配分区,而不是从分配给它的分区中获取分配的方式.

我开始深入研究KafkaConsumer源代码,而 #poll()引导我进入#pollForFetches() #pixForFetches()然后引导我到fetcher#fetchedRecords()fetcher#sendFetches()

这只是让我尝试一起跟随整个Fetcher课程,也许它只是迟到或者我只是没有深入挖掘,但我无法解决消费者如何处理多个指定分区.

背景

处理由Kafka Streams支持的数据管道.

在此管道中的几个阶段,由于记录由不同的Kafka Streams应用程序处理,因此流将连接到由外部数据源提供的压缩主题,外部数据源提供将在继续下一个处理阶段之前在记录中增加的所需数据.

在此过程中,有几个死信主题,其中记录无法与可能增加记录的外部数据源匹配.这可能是因为数据尚未可用(事件或广告系列尚未投放),或者它是不良数据并且永远不会匹配.

目标是在发布新的增强数据时重新发布死信主题中的记录,以便我们可以匹配死信主题中以前不匹配的记录,以便更新它们并将它们发送到下游以进行其他处理.

记录可能无法在多次尝试中匹配,并且可能在死信主题中有多个副本,因此我们只想重新处理现有记录(在应用程序启动时的最新偏移之前)以及发送到死信主题的记录自上次运行应用程序以来(在先前保存的消费者组偏移之后).

它很好用,因为我的消费者过滤掉了应用程序启动后到达的所有记录,我的生产者通过提交偏移作为发布交易的一部分来管理我的消费者群体抵消.

但是我想确保我最终将从所有分区中消耗,因为我遇到了一个奇怪的边缘情况,其中未匹配的记录被重新处理并落入与死信主题中相同的分区,仅被消费者过滤掉.虽然没有获得新批次的记录,但是还有一些分区还没有被重新处理.

任何帮助了解单个消费者如何处理多个分配的分区将不胜感激.

java apache-kafka kafka-consumer-api

7
推荐指数
1
解决办法
984
查看次数

Enum.GetName() 作为常量属性

我已经在 C# 中工作了大约 8 个月,所以如果这很愚蠢,请原谅我......

我有一个枚举,我将在一个类中多次需要字符串值。所以我想使用 Enum.GetName() 将它设置为一个字符串变量,这是没有问题的。我只是这样做...

private string MyEnumString = Enum.GetName(typeof(MyEnum), MyEnum.Name);
Run Code Online (Sandbox Code Playgroud)

它工作得很好。

但是我试图更好地保护它,因为这个特定的 Enum 比所有其他 Enum 更重要,如果我不小心以某种方式更改了字符串值,那将是不好的,所以我试图让它像这样。

private const string MyEnumString = Enum.GetName(typeof(MyEnum), MyEnum.Name);
Run Code Online (Sandbox Code Playgroud)

在我看来,这似乎很好,因为它应该在编译时就知道了。

但是 Visual Studio 2013 抛出一个错误,指出“无法解析符号 GetName”。我知道它在未标记为“const”时有效。

所以这让我有两个关于这个的问题?为什么它会丢失对 GetName 枚举的引用?(经过一些研究,我怀疑这与 GetName 是一种方法而不是 Enum 类的属性有关,但错误消息对我来说没有意义)

最后有没有办法将 MyEnum.Name 的名称读取到一个常量字符串而不是我在做什么?

c# enums compiler-errors constants

6
推荐指数
1
解决办法
1166
查看次数

Kafka Streams 在生成主题时不会将偏移量增加 1

我已经实现了一个简单的 Kafka 死信记录处理器。

当使用控制台生产者产生的记录时,它工作得很好。

但是,我发现我们的 Kafka Streams 应用程序并不能保证为接收器主题生成记录,对于生成的每个记录,偏移量将增加 1。

死信处理器背景:

我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录。当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续向下流。当新数据发布时,我们将来自死信主题的最新消息转储回流应用程序的源主题,以便使用新数据进行重新处理。

死信处理器:

  • 在运行应用程序开始时记录每个分区的结束偏移量
  • 如果重新处理的记录返回到死信主题,结束偏移量标记停止处理给定死信主题的记录的点,以避免无限循环。
  • 应用程序通过消费者组从上次运行产生的最后一个偏移量恢复。
  • 应用程序正在使用事务并KafkaProducer#sendOffsetsToTransaction提交最后产生的偏移量。

为了跟踪我的范围内的所有记录何时针对某个主题的分区被处理,我的服务将其从生产者的最后产生的偏移量与消费者保存的结束偏移量映射进行比较。当我们到达结束偏移量时,消费者通过以下方式暂停该分区KafkaConsumer#pause,当所有分区都暂停时(意味着它们到达保存的结束偏移量),然后调用它退出。

卡夫卡消费者API国:

偏移量和消费者位置 Kafka 为分区中的每条记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,并且还表示消费者在该分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。

卡夫卡生产者API引用下一偏移量始终是+1为好。

将指定偏移量列表发送给消费者组协调器,并将这些偏移量标记为当前事务的一部分。仅当事务成功提交时,这些偏移量才会被视为已提交。提交的偏移量应该是您的应用程序将使用的下一条消息,即 lastProcessedMessageOffset + 1。

但是您可以在我的调试器中清楚地看到,单个分区消耗的记录一次只增加 1 次... 在此处输入图片说明

我想这可能是 Kafka 配置问题,max.message.bytes但没有一个真正有意义。然后我想也许是因为加入,但没有看到任何会改变制片人运作方式的方式。

不确定它是否相关,但我们所有的 Kafka 应用程序都在使用 Avro 和 Schema Registry...

无论生产方法如何,偏移量是否应该始终增加 1,或者使用 Kafka 流 API 是否可能无法提供与普通生产者消费者客户端相同的保证?

有什么完全是我遗漏的吗?

java apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

5
推荐指数
1
解决办法
4005
查看次数

使用特定 ID 将架构添加到架构注册表

我们使用 Confluence SchemaRegistry 和 KafkaStreams 已经一年多了,一切都运行良好;直到昨天。

在 UAT 环境中,我们似乎删除了一个架构主题,并且我们的一个应用程序开始故障转移并显示以下消息

[错误] LogAndFailExceptionHandler - 反序列化期间捕获异常,taskId:0_13,主题:TOPIC_NAME,分区:13,偏移量:0 org.apache.kafka.common.errors.SerializationException:检索 id 1531 的 Avro 架构时出错

我检查了架构注册表,发现主题丢失了,并使用curl查询错误中列出的id 1531,例如:

curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531
Run Code Online (Sandbox Code Playgroud)

并回来了:

{"error_code":40403,"message":"Schema not found"}
Run Code Online (Sandbox Code Playgroud)

我天真地只是尝试再次注册架构,没有考虑它,它起作用了,但是注册架构的 id 与之前的 1531 ID 不同。

我需要将架构注册到 ID 1531,因为主题中的现有消息已在魔术字节中包含该 Id 1531。

我在https://docs.confluence.io/current/schema-registry/docs/develop/api.html检查了 API 文档,但没有看到任何用于为模式设置给定 Id 的内容。

无论如何,是否可以使用架构注册表将架构强制指定为特定 ID?

我知道一些备份解决方案,但我现在正在寻找一种修复方法,希望能够防止数据丢失或采取特殊措施来修复主题数据。

java apache-kafka confluent-schema-registry

5
推荐指数
2
解决办法
2971
查看次数