相关疑难解决方法(0)

Kafka Connect:没有为连接器创建任务

我们正在使用 Debezium (MongoDB) 和 Confluent S3 连接器以分布式模式运行 Kafka Connect(Confluent Platform 5.4,即 Kafka 2.4)。通过 REST API 添加新连接器时,连接器创建为 RUNNING 状态,但不会为连接器创建任何任务。

暂停和恢复连接器无济于事。当我们停止所有工人然后再次启动它们时,任务被创建,一切都按预期运行。

该问题不是由连接器插件引起的,因为我们看到 Debezium 和 S3 连接器的行为相同。同样在调试日志中,我可以看到 Debezium 正确地从 Connector.taskConfigs() 方法返回任务配置。

有人可以告诉我该怎么做,我们可以在不重新启动工作人员的情况下添加连接器吗?谢谢。

配置详情

集群有 3 个节点,具有以下connect-distributed.properties

bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5

config.storage.topic=connect-configs-qa
config.storage.replication.factor=3

status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083

plugin.path=/opt/kafka-connect/plugins,/usr/share/java/

security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>

max.request.size=20000000
max.partition.fetch.bytes=20000000
Run Code Online (Sandbox Code Playgroud)

连接器配置

Debezium 示例:

{
  "name": "qa-mongodb-comp-converter-task|1",
  "config": {
    "connector.class": …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect confluent-platform

6
推荐指数
1
解决办法
1653
查看次数

Kafka接收器连接器:即使重新启动后也没有分配任务

我在一组 Docker 容器中使用 Confluence 3.2,其中一个容器正在运行 kafka-connect 工作线程。

由于我尚不清楚的原因,我的四个连接器中的两个 - 具体来说,hpgraphsl 的MongoDB 接收器连接器- 停止工作。我能够确定主要问题:连接器没有分配任何任务,通过调用 可以看出GET /connectors/{my_connector}/status。其他两个连接器(同一类型)没有受到影响,并且正在愉快地产生输出。

我尝试了三种不同的方法来通过 REST API 让连接器再次运行:

  • 暂停和恢复连接器
  • 重新启动连接器
  • 使用相同的配置删除并创建同名的连接器

所有方法都不起作用。我终于让我的连接器再次工作:

  • 删除连接器并以不同的名称创建连接器,my_connector_v2例如my_connector

这里发生了什么?为什么我无法重新启动现有连接器并让它启动实际任务?kafka-connect 工作线程或 Kafka 代理的某些与 kafka-connect 相关的主题中是否有任何陈旧数据需要清理?

我已经在特定连接器的 github 存储库上提交了一个问题,但我觉得这实际上可能是与 kafka-connect 的内在相关的一般错误。有任何想法吗?

mongodb apache-kafka docker apache-kafka-connect confluent-platform

5
推荐指数
1
解决办法
4781
查看次数

带有 JdbcConnectionSource 连接器的 Kafka Connect 无法创建任务(连接器正在运行但任务不是)

我似乎经常根据查询从 JdbcConnectionSource 创建一个 Kafka Connect 连接器,并且连接器创建成功,状态为“RUNNING”,但没有创建任何任务。查看我的容器的控制台日志,我没有看到任何错误的迹象,我可以说:没有错误,没有警告,没有解释任务失败的原因。我可以让其他连接器工作,但有时不能。

当连接器无法创建 RUNNING 任务时,如何获得更多信息以进行故障排除?

我将在下面发布我的连接器配置示例。

我正在使用 Kafka Connect 5.4.1-ccs。

连接器配置(它是 JDBC 背后的 Oracle 数据库):

{
    "name": "FiscalYear",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": 1,
        "connection.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=myhost.example.com)(PORT=1521))(LOAD_BALANCE=OFF)(FAILOVER=OFF)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=MY_DB_PRI)(UR=A)))",
        "connection.user":"myuser",
        "connection.password":"mypass",
        "mode": "timestamp",
        "timestamp.column.name": "MAINT_TS",
        "topic.prefix": "MyTeam.MyTopicName",
        "poll.interval.ms": 5000,
        "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "numeric.mapping": "best_fit",

        "_comment": "The query is wrapped in `select * from ()` so that JdbcSourceConnector can automatically append a WHERE clause.",
        "query": "SELECT * FROM (SELECT fy_nbr, min(fy_strt_dt) fy_strt_dt, max(fy_end_dt) fy_end_dt FROM myuser.fsc_dt fd WHERE …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

5
推荐指数
1
解决办法
527
查看次数