如何修改kafka连接器?

Raj*_*Raj 4 apache-kafka apache-kafka-connect debezium

我在终端上使用curl 在 docker 上创建了一个 debezium 连接器,但我一直在修改现有连接器。

我的泊坞窗文件:

---
version: '3'
services:

  kafka-connect-02:
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect-02
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "https://***9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-02'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01-v04
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
      CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
      # Confluent Cloud config
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: "500"
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
      #
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
      CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
      #
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
      CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
      # External secrets config
      # See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
      CONNECT_CONFIG_PROVIDERS: 'file'
      CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
    command: 
      - bash 
      - -c 
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:0.10.0
        confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:0.5.5
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &  

        #
        sleep infinity
Run Code Online (Sandbox Code Playgroud)

我的 debezium 连接器:

curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/Procura_CDC/config  -d '{  "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":"1",
"database.server.name":"***",
"database.hostname":"***",
"database.port":"***",
"database.user":"Kafka",
"database.password":"***",
"database.dbname":"Procura_Prod",
"database.history.kafka.bootstrap.servers":"*****",
"database.history.kafka.topic":"dbhistory.procura",
"table.whitelist":"dbo.CLIENTS,dbo.VISITS",
"poll.interval.ms":"2000",
"snapshot.fetch.size":"2000",
"snapshot.mode":"initial",
"snapshot.isolation.mode":"snapshot",
"transforms":"unwrap,dropPrefix",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"procura.dbo.(.*)",
"transforms.dropPrefix.replacement":"$1" }'
Run Code Online (Sandbox Code Playgroud)

我在修改已使用上述代码创建的 debezium 连接器时遇到错误。使用PUTorPOST方法也不起作用。出现错误"curl: (7) failed to connect to localhost port 8083: Connection refused""curl: (56) Recv failure: connection reset by peer.""error_code":500,"message":"Request timed out"知道如何修改。docker新手,任何帮助将不胜感激。谢谢

Rob*_*att 6

您可以用于PUT创建和更新连接器配置。这是一个例子:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-file-01/config \
    -d '{
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "file": "/tmp/totail.txt",
    "topic": "foo",
    "tasks.max": 6
}'    
Run Code Online (Sandbox Code Playgroud)

这将创建(或修改)名为 的连接器source-file-01。如果您想更改其配置,只需PUT在更改必要值的同时重新发出即可。

这种重新运行命令的能力是我总是喜欢创建连接器的原因PUTPOST因为您不需要根据连接器是否已存在来更改运行它的方式。