小编ped*_*sen的帖子

Kafka接收器连接器:即使重新启动后也没有分配任务

我在一组 Docker 容器中使用 Confluence 3.2,其中一个容器正在运行 kafka-connect 工作线程。

由于我尚不清楚的原因,我的四个连接器中的两个 - 具体来说,hpgraphsl 的MongoDB 接收器连接器- 停止工作。我能够确定主要问题:连接器没有分配任何任务,通过调用 可以看出GET /connectors/{my_connector}/status。其他两个连接器(同一类型)没有受到影响,并且正在愉快地产生输出。

我尝试了三种不同的方法来通过 REST API 让连接器再次运行:

  • 暂停和恢复连接器
  • 重新启动连接器
  • 使用相同的配置删除并创建同名的连接器

所有方法都不起作用。我终于让我的连接器再次工作:

  • 删除连接器并以不同的名称创建连接器,my_connector_v2例如my_connector

这里发生了什么?为什么我无法重新启动现有连接器并让它启动实际任务?kafka-connect 工作线程或 Kafka 代理的某些与 kafka-connect 相关的主题中是否有任何陈旧数据需要清理?

我已经在特定连接器的 github 存储库上提交了一个问题,但我觉得这实际上可能是与 kafka-connect 的内在相关的一般错误。有任何想法吗?

mongodb apache-kafka docker apache-kafka-connect confluent-platform

5
推荐指数
1
解决办法
4781
查看次数

Kubernetes 上的 Kafka Streams:重新部署后的长期重新平衡

问题

我们使用 StatefulSet 在 Kubernetes 上部署 Scala Kafka Streams 应用程序。这些实例具有单独的applicationIds,因此它们每个都会复制完整的输入主题以实现容错。它们本质上是只读服务,仅读取状态主题并将其写入状态存储,并通过 REST 处理客户请求。这意味着,在任何给定时间,消费者组始终仅包含一个Kafka Streams 实例

现在我们的问题是,当触发滚动重启时,每个实例的启动时间大约需要5分钟,其中大部分时间都花在等待状态上REBALANCING我在这里读到,Kafka Streams 不会发送请求LeaveGroup以便在容器重新启动后快速返回,而无需重新平衡。为什么这对我们不起作用,为什么重新平衡需要这么长时间,即使是applicationId相同的?理想情况下,为了最大限度地减少停机时间,应用程序应立即从重新启动时离开的位置接管。

配置

以下是我们对默认值进行更改的一些配置:

properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), "300000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest")
// RocksDB config, see https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, classOf[BoundedMemoryRocksDBConfig])    
Run Code Online (Sandbox Code Playgroud)

问题/相关配置

  • 有助于减少session.timeout.ms吗?我们将其设置为相当大的值,因为 Kafka 代理位于不同的数据中心,并且网络连接有时不是超级可靠。
  • 这个答案建议减少max.poll.interval.ms,因为它与重新平衡超时有关。那是对的吗?我犹豫是否要更改此设置,因为它可能会对我们应用程序的正常操作模式产生影响。
  • 有人提到在部署期间延迟重新平衡的配置group.initial.rebalance.delay.ms- 但这也会在从崩溃中恢复后导致延迟,不是吗?
  • 我还偶然发现了KIP-345,它的目标是完全通过 消除消费者对静态会员资格的重新平衡group.instance.id,这非常适合我们的用户案例,但它似乎尚未在我们的经纪商上提供。

我对大量的配置以及如何使用它们来实现更新后的快速恢复感到困惑。谁能解释一下他们是怎么一起玩的?

java apache-kafka kubernetes apache-kafka-streams

5
推荐指数
1
解决办法
2551
查看次数