我尝试使用以下配置启动 JDBC 接收器连接器:
{
"name": "crm_data-sink_hh",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 6,
"topics": "crm_account,crm_competitor,crm_event,crm_event_participation",
"connection.url": "jdbc:postgresql://db_host/hh?prepareThreshold=0",
"connection.user": "db_user",
"connection.password": "${file:db_hh_kafka_connect_pass}",
"dialect.name": "PostgreSqlDatabaseDialect",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "guid",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
"errors.deadletterqueue.context.headers.enable":true
}
}
Run Code Online (Sandbox Code Playgroud)
但是当连接器处于运行状态时没有任务正在运行:
curl -X GET http://kafka-connect:10900/connectors/crm_data-sink_hh/status
{"name":"crm_data-sink_hh","connector":{"state":"RUNNING","worker_id":"172.16.24.14:10900"},"tasks":[],"type":"sink"}
Run Code Online (Sandbox Code Playgroud)
我多次遇到这个问题,但我很困惑,因为它是随机发生的。我的问题和这个问题非常相似。我将不胜感激任何帮助!
更新。11/04/2019(不幸的是,现在我只有INFO级别的日志)
最后,经过几次尝试,我通过更新现有连接器的配置来启动连接器来运行任务crm_data-sink_db_hh:
$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}
$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}
$ curl -X PUT -d @new_config.json http://docker21:10900/connectors/crm_data-sink_db_hh/config -H 'Content-Type: application/json'
$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"UNASSIGNED","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}
$ curl …Run Code Online (Sandbox Code Playgroud) 我们正在使用 Debezium (MongoDB) 和 Confluent S3 连接器以分布式模式运行 Kafka Connect(Confluent Platform 5.4,即 Kafka 2.4)。通过 REST API 添加新连接器时,连接器创建为 RUNNING 状态,但不会为连接器创建任何任务。
暂停和恢复连接器无济于事。当我们停止所有工人然后再次启动它们时,任务被创建,一切都按预期运行。
该问题不是由连接器插件引起的,因为我们看到 Debezium 和 S3 连接器的行为相同。同样在调试日志中,我可以看到 Debezium 正确地从 Connector.taskConfigs() 方法返回任务配置。
有人可以告诉我该怎么做,我们可以在不重新启动工作人员的情况下添加连接器吗?谢谢。
配置详情
集群有 3 个节点,具有以下connect-distributed.properties:
bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5
config.storage.topic=connect-configs-qa
config.storage.replication.factor=3
status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083
plugin.path=/opt/kafka-connect/plugins,/usr/share/java/
security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>
max.request.size=20000000
max.partition.fetch.bytes=20000000
Run Code Online (Sandbox Code Playgroud)
连接器配置
Debezium 示例:
{
"name": "qa-mongodb-comp-converter-task|1",
"config": {
"connector.class": …Run Code Online (Sandbox Code Playgroud)