在 Kafka 和动态主题中关联

Yog*_*oth 5 apache-kafka kafka-consumer-api

我正在使用 Kafka 构建相关系统。假设有一个服务A执行数据处理,并且有数千个客户端B向它提交作业。B是短暂的,它们出现在网络上,将数据推送到A,然后发生两件重要的事情:

  1. B将立即收到A的状态;
  2. 然后B要么完全退出,保持在线以接收有关状态的进一步更新,要么偶尔会重新打开以检查状态。

(这与网格计算或 mpi 没有什么不同)。

这两点都应该使用一个众所周知的概念来实现correlationIdB拥有一个唯一的 id(在我的例子中是 UUID),它在标头中发送给A,而A又使用它作为Reply-To主题来发送状态更新。这意味着它必须即时创建主题,它们无法预先确定。

我已经auto.create.topics.enable打开了,它确实动态地创建了主题,但是现有的消费者不知道它们并且需要重新启动 [我想是为了获取主题元数据,如果我理解文档正确的话]。我还检查了消费者的metadata.max.age.ms设置,但它似乎没有帮助,即使我将其设置为非常低的值。

据我所知,这还没有得到解答,即:kafka 过滤/动态主题创建kafka 消费者动态检测添加的主题Kafka 生产者可以创建主题和分区吗?或回答不满意。

由于有数百个A和数千个B,我不可能使用共享主题或类似的东西,以免我的网络过载。我可以使用 Kafka'sAdminTools或任何它的名字来预先创建主题,但我觉得它有点傻(即使我看到人们使用它与 Zookeeper 和 Kafka 基础设施本身交谈的现实生活中的例子)。

所以问题是,有没有一种方法可以动态创建 Kafka 主题,让消费者和生产者都知道它而无需重新启动或其他任何事情?而且,在最坏的情况下,AdminTools 真的会帮助它吗?我必须在哪一边使用它 - A还是B

Kafka 0.11, Java 8

更新AdminClient无论出于何种原因, 创建主题都无济于事,LEADER_NOT_AVAILABLE当我尝试订阅时,消费者仍然会抛出。

Yog*_*oth 2

好的,所以我\xe2\x80\x99d 回答我自己的问题。

\n\n
    \n
  1. AdminClient仅当在创建相应的消费者之前执行时,创建主题才有效。
  2. \n
  3. 更改了我的拓扑,考虑到 1) 并在消息头中引入相关 ID 的交换(与 JMS 中相同)。我还必须实现某些拓扑管理方法,将B分组到容器中。
  4. \n
\n\n

需要注意的是,正如很多人所说,这只在B处于单消费者组并监听 1 个分区的主题时才有效。

\n\n

要了解我正在从事的工作,您可以查看我一直在开发的中间件框架https://github.com/ikonkere/magic

\n