KafkaConsumer Java API Subscribe()与Assign()

Kar*_*nna 6 java apache-kafka kafka-consumer-api

我是Kafka Java API的新手,正在研究使用特定Kafka主题的记录。

我知道我可以使用方法subscribe()从该主题开始轮询记录。assign()如果我要从选定主题的分区开始轮询记录,Kafka还提供了方法。

我想知道这是否是两者之间的唯一区别?

Dea*_*ool 11

是的subscribe需要group.id,因为组中的每个消费者可以动态地设置它想通过认购APIs.Kafka的一个订阅将各消费群体在订阅的主题每封邮件传送到一个进程的主题列表。这是通过平衡使用者组中所有成员之间的分区来实现的,以便将每个分区分配给组中的一个使用者

assign将为此用户手动分配分区列表。并且此方法不使用使用者的组管理功能(无需group.id

主要区别是assign(Collection)将使控制器失去对动态分区分配和使用者组协调的控制

使用者也可以使用assign(Collection)手动分配特定分区(类似于较早的“简单”使用者)。在这种情况下,动态分区分配和使用者组协调将被禁用。

订阅

public void subscribe(java.util.Collection<java.lang.String> topics)
Run Code Online (Sandbox Code Playgroud)

订阅方法订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其与unsubscribe().

作为组管理的一部分,使用方将跟踪属于特定组的使用方列表,并在以下事件之一触发时触发重新平衡操作:

Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API
Run Code Online (Sandbox Code Playgroud)

分配

public void assign(java.util.Collection<TopicPartition> partitions)
Run Code Online (Sandbox Code Playgroud)

Assign方法手动为此用户分配分区列表。如果给定的主题分区列表为空,则将其与unsubscribe()相同。

通过这种方法手动分配主题不使用使用者的组管理功能。这样,当组成员身份或群集和主题元数据发生更改时,将不会触发任何重新平衡操作。

  • 值得注意的是,assign() 可以使用 group.id 来存储提交的偏移量。拥有一个具有自己专用的 group.id 的 allocate() 消费者来跟踪自己的偏移量而不需要与其他消费者进行任何协调是有效的。在某些情况下这样做是有好处的。 (4认同)