标签: apache-kafka-connect

Kafka Connect FileStreamSink 连接器删除 JSON 消息的引号并将冒号更改为等号

概括

当我与控制台制作人一起传输此内容时

{"id":1337,"status":"example_topic_1 success"}
Run Code Online (Sandbox Code Playgroud)

我从我的文件流消费者那里得到这个

/data/example_topic_1.txt

{id=1337, status=example_topic_1 success}
Run Code Online (Sandbox Code Playgroud)

这对我来说是一个主要问题,因为如果不假设引号曾经在哪里,就无法恢复原始 JSON 消息。如何将消息输出到文件,同时保留引号?

细节

  1. 首先,我启动文件接收器连接器。
    # sh bin/connect-standalone.sh \
    >   config/worker.properties \
    >   config/connect-file-sink-example_topic_1.properties
    
    Run Code Online (Sandbox Code Playgroud)
  2. 其次,我启动控制台消费者(也内置于 Kafka),以便我可以轻松地通过视觉确认消息是否正确传递。
    # sh bin/kafka-console-consumer.sh \
    >   --bootstrap-server kafka_broker:9092 \
    >   --topic example_topic_1
    
    Run Code Online (Sandbox Code Playgroud)
  3. 最后,我启动一个控制台生成器来发送消息,然后输入一条消息。

    # sh bin/kafka-console-producer.sh \
    >   --broker-list kafka_broker:9092 \
    >   --topic example_topic_1
    
    Run Code Online (Sandbox Code Playgroud)

    从控制台消费者中,消息会正确弹出,并带有引号。

    {"id":1337,"status":"example_topic_1 success"}
    
    Run Code Online (Sandbox Code Playgroud)

    但我从 FileStreamSink 消费者那里得到了这个:

    /data/example_topic_1.txt

    {id=1337, status=example_topic_1 success}
    
    Run Code Online (Sandbox Code Playgroud)

我的配置

配置/worker.properties

offset.storage.file.filename=/tmp/example.offsets

bootstrap.servers=kafka_broker:9092
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Run Code Online (Sandbox Code Playgroud)

配置/连接文件接收器-example_topic_1.properties

name=file-sink-example_topic_1
connector.class=FileStreamSink
tasks.max=1
file=/data/example_topic_1.txt
topics=example_topic_1
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

2
推荐指数
1
解决办法
1038
查看次数

无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器

我正在尝试在启用 SSL 的 Kafka 集群中注册 MySql Debezium 连接器。我为此目的使用的卷曲是:

curl -k -X POST -H "Accept:application/json"  -H "Content-Type:application/json" https://<IPADDRESS>:8083/connectors/  -d '{ "name": "test-eds-extactor-profile", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<DBHOSTNAME>", "database.port": "3306", "database.user": "debezium", "database.password": "*****", "database.server.id": "1", "database.server.name": "MySQL-Database-Docker", "database.history.kafka.bootstrap.servers": "<IPADDRESS>:9094", "database.history.kafka.topic": "dbhistory.profile" , "include.schema.changes": "true", "table.whitelist": "test_eds_extraction_src_db_mock.profile", "database.history.producer.security.protocol": "SASL_PLAINTEXT", "database.history.producer.ssl.keystore.location": "path/to/server.jks", "database.history.producer.ssl.keystore.password": "******", "database.history.producer.ssl.truststore.location": "path/to//server.jks", "database.history.producer.ssl.truststore.password": "******", "database.history.producer.ssl.key.password": "******", "database.history.consumer.security.protocol": "SASL_PLAINTEXT", "database.history.consumer.ssl.keystore.location": "path/to/server.jks", "database.history.consumer.ssl.keystore.password": "******", "database.history.consumer.ssl.truststore.location": "path/to/server.jks", "database.history.consumer.ssl.truststore.password": "******", "database.history.consumer.ssl.key.password": "******" } }'
Run Code Online (Sandbox Code Playgroud)

Debezium 无法创建 database.history 主题,失败并出现以下错误:

{"name":"test-eds-extactor-profile","connector":{"state":"RUNNING","worker_id":"<IPADDRESS>:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"<IPADDRESS>:8083","trace":"org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: Failed to …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-connect debezium

2
推荐指数
1
解决办法
5931
查看次数

将 dockerized kafka 接收器连接器实现到 mongo

我正在尝试使用docker实现kafka与mongodb和mysql的连接。

我想要的是下图:

Kafka 作为接收器连接到 MongoDB 和 MySQL

卡夫卡连接 MongoDB:

我看过官方mongodb存储库的docker-compose 。它有两个问题:

  1. 对于我的目的来说太复杂了。因为它运行了多个mongodb容器,并且还使用了很多镜像,消耗了如此多的资源。

  2. 它有一些未解决的问题,导致 kafka 到 mongodb 连接出现故障。在这里你可以看到我的问题。

我在 docker-compose.yml 中使用 debezium 进行连接实现的内容如下:

version: '3.2'
services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - target: 9094
        published: 9094
        protocol: tcp
        mode: host
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LOG_DIRS: /kafka/logs
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka:/kafka

  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    volumes:
      - zookeeper:/opt/zookeeper-3.4.13

  mongo:
    image: mongo
    container_name: mongo
    ports:
      - 27017:27017

  connect:
    image: debezium/connect
    container_name: connect
    ports:
      - 8083:8083 …
Run Code Online (Sandbox Code Playgroud)

mongodb apache-kafka docker apache-kafka-connect

2
推荐指数
1
解决办法
5073
查看次数

Kafka 连接可以使用批量模式的自定义查询吗?

我正在尝试发送 7 天前的每行记录。这是我正在处理的配置,但即使查询在数据库服务器上生成记录,它也不起作用。

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": 1,
    "mode": "bulk",
    "connection.url": "jdbc:mysql://mysql:3300/test_db?user=root&password=password",
    "query": "SELECT * FROM test_table WHERE DATEDIFF(CURDATE(), test_table.modified) = 7;",
    "topic.prefix": "test-jdbc-",
    "poll.interval.ms": 10000
}
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

2
推荐指数
1
解决办法
3585
查看次数

无法从 EC2 外部连接到 AWS 上运行的 kafka 连接集群

我有一个 ECS 集群,其中有 3 个 EC2 实例,全部位于私有子网中。我创建了一个任务定义来运行 Confluence 提供的 kafka-connect 映像,并使用以下环境变量:

    CONNECT_CONFIG_STORAGE_TOPIC=quickstart-config
    CONNECT_GROUP_ID=quickstart
    CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
    CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
    CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
    CONNECT_OFFSET_STORAGE_TOPIC=quickstart-offsets
    CONNECT_PLUGIN_PATH=/usr/share/java
    CONNECT_REST_ADVERTISED_HOST_NAME=localhost
    CONNECT_REST_ADVERTISED_PORT=8083
    CONNECT_SECURITY_PROTOCOL=SSL
    CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
    CONNECT_STATUS_STORAGE_TOPIC=quickstart-status
    CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
Run Code Online (Sandbox Code Playgroud)

我在此集群前面有一个应用程序负载均衡器,并在端口 8083 上有一个侦听器。我已正确设置目标组以包含运行 kafka-connect 的 EC2 实例。因此负载均衡器应该将请求转发到集群。确实如此,但我总是能得到回复502 Bad Gateway。我可以 ssh 进入 EC2 实例并curl localhost:8083从 kafka-connect 获取响应,但从 EC2 外部,我没有得到响应。

为了排除负载均衡器和集群之间的网络问题,我创建了一个在端口 80 上运行 Nginx 的单独任务定义,并且我能够通过负载均衡器从 EC2 实例外部成功访问它。

我感觉我没有设置CONNECT_REST_ADVERTISED_HOST_NAME正确的值。据我了解,这是客户端应该连接的主机。但是,因为我的 EC2 实例位于私有子网中,所以我不知道将其设置为什么,这就是我将其设置为 localhost 的原因。我尝试将其设置为负载均衡器的 DNS 名称,但这不起作用。

amazon-ecs apache-kafka apache-kafka-connect

2
推荐指数
1
解决办法
2941
查看次数

kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是否可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?

我正在尝试对 Elasticsearch (ES) kafka 连接器使用“write.method”upsert。从我的 kafka 流应用程序中,我正在编写我想要更新插入的文档,该文档位于 ES 连接器配置为读取的 kafka 主题上。我在这个主题上使用 avro 对象作为 kafka 值。我的文档的 AVRO 定义如下所示:

{
  "type": "record",
  "name": "Document",
  "fields": [
    {
      "name": "id",
      "type": ["null", "string"],
    },
    {
      "name": "name",
      "type": ["null", "string"]
    },
    {
      "name": "address",
      "type": ["null", "string"]
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

该文档有时仅包含 ID 和名称,有时仅包含地址。当我只发送地址时,id 和 name 会被覆盖,反之亦然。我已设置behavior.on.null.valuesignore希望 ES 连接器忽略 null id 和 name 值,但这并不能按预期工作。

尽管当我在 kafka 主题上使用两个不同的 AVRO 对象时,第一个仅包含 id 和名称,另一个仅包含地址,upsert 模式行为符合预期。但是对于同一个kafka主题允许多个AVRO对象定义,我需要将主题的兼容模式设置为NONE,这并不理想。

解决当前问题的正确方法是什么?

elasticsearch apache-kafka apache-kafka-connect

2
推荐指数
1
解决办法
1495
查看次数

Debezium Kafka CDC 连接器将密钥设置为 avro,即使转换器是 StringConverver

这是我的连接器配置:

curl -s -k -X POST http://***************:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "mysql-cdc-CUSTOMER_DETAILS-007",
  "config": {
    "tasks.max":"2",
    "poll.interval.ms":"500",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "dbnode",
    "database.port": "3306",
    "database.user": "**********",
    "database.password": "###########",
    "database.server.name": "dbnode",
    "database.whitelist": "device_details",
    "database.history.kafka.bootstrap.servers": "**********:9092",
    "database.history.kafka.topic": "schema-changes.device_details",
    "include.schema.changes":"true",
    "table.whitelist":"device_details.tb_customermst",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://************:8081",
    "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.key.converter.schemas.enable":"false",
    "internal.value.converter.schemas.enable":"false"
  }
}' | jq '.'
Run Code Online (Sandbox Code Playgroud)

从ksql消费数据时,显示如下:

ksql> print 'Device_Details.device_details.tb_customermst' from beginning;
Format:AVRO
5/2/20 2:08:34 PM IST, Struct{customerid=10001}, {"before": null, "after": {"customerid": 10001, "firstname": "Klara", "lastname": "Djokic", "emailid": "klara.djokic007@iillii.org", "mobilenumber": "+1 …
Run Code Online (Sandbox Code Playgroud)

mysql apache-kafka apache-kafka-connect debezium confluent-platform

2
推荐指数
1
解决办法
2935
查看次数

elasticsearch 连接器不起作用 - java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableSet

Kafka elasticsearch 连接器“confluenceinc-kafka-connect-elasticsearch-5.5.0”无法在本地工作。

"java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableSet\n\tat io.searchbox.client.AbstractJestClient.<init>(AbstractJestClient.java:38)\n\tat io.searchbox.client.http.JestHttpClient.<init>(JestHttpClient.java:43)\n\tat io.searchbox.client.JestClientFactory.getObject(JestClientFactory.java:51)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:149)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:141)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
Run Code Online (Sandbox Code Playgroud)

我还在同一路径中使用 mssql 连接器和 s3 连接器插件;它们可以工作,但 elasticsearch 插件给出 noclassfound 错误。这是我在工作人员中的文件夹结构:

[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls
confluentinc-kafka-connect-elasticsearch-5.5.0  confluentinc-kafka-connect-s3-5.5.0  debezium-connector-sqlserver  kafka-connect-shell-sink-5.1.0
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls -l
total 16
drwxrwxr-x 2 root root 4096 May 25 22:15 confluentinc-kafka-connect-elasticsearch-5.5.0
drwxrwxr-x 5 root root 4096 May 15 02:26 confluentinc-kafka-connect-s3-5.5.0
drwxrwxr-x 2 root root 4096 May 15 02:26 debezium-connector-sqlserver
drwxrwxr-x 4 root root 4096 May 15 02:26 kafka-connect-shell-sink-5.1.0
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kubernetes apache-kafka-connect strimzi

2
推荐指数
1
解决办法
2033
查看次数

K8s 上广告的 kafka 连接休息侦听器

我正在努力以分布式模式在 Kubernetes (DockerEE) 上设置 Kafka Connect。
目前,我在三个相应的 k8s-pod 上有一个由三个工作人员组成的集群。
我面临的问题是我的员工之间很难相互沟通(至少我是这么认为的)。

当我尝试启动连接器时,我得到:

{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}
Run Code Online (Sandbox Code Playgroud)

作为回应。令人困惑的是我并不总是得到这个错误响应。有时它会起作用并且连接器会按预期启动。

从我读到的内容来看,这可能归结为我配置为 CONNECT_REST_ADVERTISED_HOST_NAME 的内容。

在 k8s 中运行时作为广告地址放置的正确值是多少?

BR

apache-kafka kubernetes apache-kafka-connect

2
推荐指数
1
解决办法
1451
查看次数

有没有办法将 Amazon MSK 主题直接转储到 S3?

我计划使用 Amazon MSK,并且想将消费者日志转储到 S3 。但我没有看到任何选择。我是否需要编写自己的消费者,或者有没有办法直接将 Amazon MSK 消费者输出消费到 s3?

apache-kafka apache-kafka-connect aws-msk

2
推荐指数
1
解决办法
8417
查看次数