Apache Kafka Producer Broker连接

spa*_*rkr 10 apache-kafka

我有一组作为集群运行的Kafka代理实例.我有一个向Kafka生成数据的客户端:

props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092");
Run Code Online (Sandbox Code Playgroud)

当我们使用tcpdump监控时,我可以看到只有与broker1和broker2的连接是ESTABLISHED,而对于broker3,我的生产者没有连接.我只有一个分区的单个主题.

我的问题:

  1. 经纪人数与主题分区之间的关系如何?我应该总是有经纪人数=分数吗?

  2. 为什么在我的情况下,我无法连接到broker3?或至少我的网络监控没有显示我的生产者与broker3建立的连接?

如果我能从生产者的立场深入了解与经纪人的联系是如何工作的,那将是很棒的.

ser*_*jja 28

显然,你的制作人不需要连接到broker3:)

我将尝试向您解释当您向Kafka生成数据时会发生什么:

  1. 你启动了一些经纪人,让我们说3,然后创建一个foo带有2个分区的主题,复制因子2.非常简单的例子,但可能是某个人的真实案例.
  2. 您可以为这些代理配置metadata.broker.list(或bootstrap.servers在新生产者中)创建生产者.值得一提的是,您不一定要指定群集中的所有代理,实际上您只能指定其中的一个代理,它仍然可以工作.我也会解释一下.
  3. foo使用生产者向主题发送消息.
  4. 生产者查找其本地元数据缓存,以查看哪些代理是每个主题分区的领导者,foo以及您的foo主题有多少分区.由于这是第一次发送给生产者,因此本地缓存不包含任何内容.
  5. 生产者按顺序TopicMetadataRequest向每个代理发送一个,metadata.broker.list直到第一次成功响应.这就是为什么我提到该列表中的1个经纪人只要活着就会工作.
  6. 返回TopicMetadataResponse将包含有关所请求主题的信息,在您的情况下,它foo包含集群中的代理.基本上,此响应包含以下内容:
    • 集群中的代理列表,其中每个代理都有ID,主机和端口.此列表可能不包含群集中的整个代理列表,但至少应包含负责为主题主题提供服务的代理列表.
    • 主题元数据列表,其中每个条目都有主题名称,分区数,每个分区的领导者代理ID以及每个分区的ISR代理ID.
  7. 基于TopicMetadataResponse您的生产者构建其本地缓存,现在确切地知道主题foo分区的请求0应该转到代理X.
  8. 根据主题中的分区数量,生产者对您的消息进行分区并累积它,并知道它应作为批处理的一部分发送给某个代理.
  9. 批处理已满或linger.ms超时通过时,生产者将批处理刷新到代理.通过"刷新"我的意思是"打开与代理的新连接或重用现有的连接,然后发送ProduceRequest".

生产者不需要打开与所有代理的不必要的连接,因为您生成的主题可能无法由某些代理提供服务,并且您的群集可能非常大.想象一下1000个代理群集有很多主题,但其中一个主题只有一个分区 - 你只需要一个连接,而不是1000个.

在您的特定情况下,我不能100%确定为什么您有两个开放连接到代理,如果您只有一个分区,但我假设在元数据发现期间打开了一个连接并缓存以便重用,第二个是实际的代理连接产生数据.但是,在这种情况下我可能错了.

但无论如何,根本没有必要为第三个经纪人建立连接.

关于你的问题"我应该总是有多少经纪人=分数?" 答案很可能是没有.如果你解释一下你想要实现的目标,也许我能够指出你正确的方向,但这一点太宽泛而无法解释.我建议阅读此内容以澄清事情.

UPD回答评论中的问题:

元数据缓存更新为2种情况:

  1. 如果生产者由于任何原因未能与代理进行通信 - 这包括代理根本无法访问且代理响应错误的情况(例如"我不再是该分区的领导者,请离开")

  2. 如果没有发生故障,客户端仍然每次刷新元数据metadata.max.age.ms(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java#L42- L43)发现新的经纪人和分区本身.

  • 这解释得很好!万分感谢! (2认同)