当我与控制台制作人一起传输此内容时
{"id":1337,"status":"example_topic_1 success"}
Run Code Online (Sandbox Code Playgroud)
我从我的文件流消费者那里得到这个
{id=1337, status=example_topic_1 success}
Run Code Online (Sandbox Code Playgroud)
这对我来说是一个主要问题,因为如果不假设引号曾经在哪里,就无法恢复原始 JSON 消息。如何将消息输出到文件,同时保留引号?
# sh bin/connect-standalone.sh \
> config/worker.properties \
> config/connect-file-sink-example_topic_1.properties
Run Code Online (Sandbox Code Playgroud)# sh bin/kafka-console-consumer.sh \
> --bootstrap-server kafka_broker:9092 \
> --topic example_topic_1
Run Code Online (Sandbox Code Playgroud)最后,我启动一个控制台生成器来发送消息,然后输入一条消息。
# sh bin/kafka-console-producer.sh \
> --broker-list kafka_broker:9092 \
> --topic example_topic_1
Run Code Online (Sandbox Code Playgroud)
从控制台消费者中,消息会正确弹出,并带有引号。
{"id":1337,"status":"example_topic_1 success"}
Run Code Online (Sandbox Code Playgroud)
但我从 FileStreamSink 消费者那里得到了这个:
{id=1337, status=example_topic_1 success}
Run Code Online (Sandbox Code Playgroud)offset.storage.file.filename=/tmp/example.offsets
bootstrap.servers=kafka_broker:9092
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Run Code Online (Sandbox Code Playgroud)
name=file-sink-example_topic_1
connector.class=FileStreamSink
tasks.max=1
file=/data/example_topic_1.txt
topics=example_topic_1
Run Code Online (Sandbox Code Playgroud) 我正在尝试在启用 SSL 的 Kafka 集群中注册 MySql Debezium 连接器。我为此目的使用的卷曲是:
curl -k -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://<IPADDRESS>:8083/connectors/ -d '{ "name": "test-eds-extactor-profile", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<DBHOSTNAME>", "database.port": "3306", "database.user": "debezium", "database.password": "*****", "database.server.id": "1", "database.server.name": "MySQL-Database-Docker", "database.history.kafka.bootstrap.servers": "<IPADDRESS>:9094", "database.history.kafka.topic": "dbhistory.profile" , "include.schema.changes": "true", "table.whitelist": "test_eds_extraction_src_db_mock.profile", "database.history.producer.security.protocol": "SASL_PLAINTEXT", "database.history.producer.ssl.keystore.location": "path/to/server.jks", "database.history.producer.ssl.keystore.password": "******", "database.history.producer.ssl.truststore.location": "path/to//server.jks", "database.history.producer.ssl.truststore.password": "******", "database.history.producer.ssl.key.password": "******", "database.history.consumer.security.protocol": "SASL_PLAINTEXT", "database.history.consumer.ssl.keystore.location": "path/to/server.jks", "database.history.consumer.ssl.keystore.password": "******", "database.history.consumer.ssl.truststore.location": "path/to/server.jks", "database.history.consumer.ssl.truststore.password": "******", "database.history.consumer.ssl.key.password": "******" } }'
Run Code Online (Sandbox Code Playgroud)
Debezium 无法创建 database.history 主题,失败并出现以下错误:
{"name":"test-eds-extactor-profile","connector":{"state":"RUNNING","worker_id":"<IPADDRESS>:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"<IPADDRESS>:8083","trace":"org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: Failed to …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用docker实现kafka与mongodb和mysql的连接。
我想要的是下图:
卡夫卡连接 MongoDB:
我看过官方mongodb存储库的docker-compose 。它有两个问题:
对于我的目的来说太复杂了。因为它运行了多个mongodb容器,并且还使用了很多镜像,消耗了如此多的资源。
它有一些未解决的问题,导致 kafka 到 mongodb 连接出现故障。在这里你可以看到我的问题。
我在 docker-compose.yml 中使用 debezium 进行连接实现的内容如下:
version: '3.2'
services:
kafka:
image: wurstmeister/kafka:latest
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_LOG_DIRS: /kafka/logs
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka:/kafka
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
volumes:
- zookeeper:/opt/zookeeper-3.4.13
mongo:
image: mongo
container_name: mongo
ports:
- 27017:27017
connect:
image: debezium/connect
container_name: connect
ports:
- 8083:8083 …Run Code Online (Sandbox Code Playgroud) 我正在尝试发送 7 天前的每行记录。这是我正在处理的配置,但即使查询在数据库服务器上生成记录,它也不起作用。
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": 1,
"mode": "bulk",
"connection.url": "jdbc:mysql://mysql:3300/test_db?user=root&password=password",
"query": "SELECT * FROM test_table WHERE DATEDIFF(CURDATE(), test_table.modified) = 7;",
"topic.prefix": "test-jdbc-",
"poll.interval.ms": 10000
}Run Code Online (Sandbox Code Playgroud)
我有一个 ECS 集群,其中有 3 个 EC2 实例,全部位于私有子网中。我创建了一个任务定义来运行 Confluence 提供的 kafka-connect 映像,并使用以下环境变量:
CONNECT_CONFIG_STORAGE_TOPIC=quickstart-config
CONNECT_GROUP_ID=quickstart
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_OFFSET_STORAGE_TOPIC=quickstart-offsets
CONNECT_PLUGIN_PATH=/usr/share/java
CONNECT_REST_ADVERTISED_HOST_NAME=localhost
CONNECT_REST_ADVERTISED_PORT=8083
CONNECT_SECURITY_PROTOCOL=SSL
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
CONNECT_STATUS_STORAGE_TOPIC=quickstart-status
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
Run Code Online (Sandbox Code Playgroud)
我在此集群前面有一个应用程序负载均衡器,并在端口 8083 上有一个侦听器。我已正确设置目标组以包含运行 kafka-connect 的 EC2 实例。因此负载均衡器应该将请求转发到集群。确实如此,但我总是能得到回复502 Bad Gateway。我可以 ssh 进入 EC2 实例并curl localhost:8083从 kafka-connect 获取响应,但从 EC2 外部,我没有得到响应。
为了排除负载均衡器和集群之间的网络问题,我创建了一个在端口 80 上运行 Nginx 的单独任务定义,并且我能够通过负载均衡器从 EC2 实例外部成功访问它。
我感觉我没有设置CONNECT_REST_ADVERTISED_HOST_NAME正确的值。据我了解,这是客户端应该连接的主机。但是,因为我的 EC2 实例位于私有子网中,所以我不知道将其设置为什么,这就是我将其设置为 localhost 的原因。我尝试将其设置为负载均衡器的 DNS 名称,但这不起作用。
我正在尝试对 Elasticsearch (ES) kafka 连接器使用“write.method”upsert。从我的 kafka 流应用程序中,我正在编写我想要更新插入的文档,该文档位于 ES 连接器配置为读取的 kafka 主题上。我在这个主题上使用 avro 对象作为 kafka 值。我的文档的 AVRO 定义如下所示:
{
"type": "record",
"name": "Document",
"fields": [
{
"name": "id",
"type": ["null", "string"],
},
{
"name": "name",
"type": ["null", "string"]
},
{
"name": "address",
"type": ["null", "string"]
}
]
}
Run Code Online (Sandbox Code Playgroud)
该文档有时仅包含 ID 和名称,有时仅包含地址。当我只发送地址时,id 和 name 会被覆盖,反之亦然。我已设置behavior.on.null.values为ignore希望 ES 连接器忽略 null id 和 name 值,但这并不能按预期工作。
尽管当我在 kafka 主题上使用两个不同的 AVRO 对象时,第一个仅包含 id 和名称,另一个仅包含地址,upsert 模式行为符合预期。但是对于同一个kafka主题允许多个AVRO对象定义,我需要将主题的兼容模式设置为NONE,这并不理想。
解决当前问题的正确方法是什么?
这是我的连接器配置:
curl -s -k -X POST http://***************:8083/connectors -H "Content-Type: application/json" -d '{
"name": "mysql-cdc-CUSTOMER_DETAILS-007",
"config": {
"tasks.max":"2",
"poll.interval.ms":"500",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "dbnode",
"database.port": "3306",
"database.user": "**********",
"database.password": "###########",
"database.server.name": "dbnode",
"database.whitelist": "device_details",
"database.history.kafka.bootstrap.servers": "**********:9092",
"database.history.kafka.topic": "schema-changes.device_details",
"include.schema.changes":"true",
"table.whitelist":"device_details.tb_customermst",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://************:8081",
"internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable":"false",
"internal.value.converter.schemas.enable":"false"
}
}' | jq '.'
Run Code Online (Sandbox Code Playgroud)
从ksql消费数据时,显示如下:
ksql> print 'Device_Details.device_details.tb_customermst' from beginning;
Format:AVRO
5/2/20 2:08:34 PM IST, Struct{customerid=10001}, {"before": null, "after": {"customerid": 10001, "firstname": "Klara", "lastname": "Djokic", "emailid": "klara.djokic007@iillii.org", "mobilenumber": "+1 …Run Code Online (Sandbox Code Playgroud) mysql apache-kafka apache-kafka-connect debezium confluent-platform
Kafka elasticsearch 连接器“confluenceinc-kafka-connect-elasticsearch-5.5.0”无法在本地工作。
"java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableSet\n\tat io.searchbox.client.AbstractJestClient.<init>(AbstractJestClient.java:38)\n\tat io.searchbox.client.http.JestHttpClient.<init>(JestHttpClient.java:43)\n\tat io.searchbox.client.JestClientFactory.getObject(JestClientFactory.java:51)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:149)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:141)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
Run Code Online (Sandbox Code Playgroud)
我还在同一路径中使用 mssql 连接器和 s3 连接器插件;它们可以工作,但 elasticsearch 插件给出 noclassfound 错误。这是我在工作人员中的文件夹结构:
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls
confluentinc-kafka-connect-elasticsearch-5.5.0 confluentinc-kafka-connect-s3-5.5.0 debezium-connector-sqlserver kafka-connect-shell-sink-5.1.0
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls -l
total 16
drwxrwxr-x 2 root root 4096 May 25 22:15 confluentinc-kafka-connect-elasticsearch-5.5.0
drwxrwxr-x 5 root root 4096 May 15 02:26 confluentinc-kafka-connect-s3-5.5.0
drwxrwxr-x 2 root root 4096 May 15 02:26 debezium-connector-sqlserver
drwxrwxr-x 4 root root 4096 May 15 02:26 kafka-connect-shell-sink-5.1.0
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r …Run Code Online (Sandbox Code Playgroud) 我正在努力以分布式模式在 Kubernetes (DockerEE) 上设置 Kafka Connect。
目前,我在三个相应的 k8s-pod 上有一个由三个工作人员组成的集群。
我面临的问题是我的员工之间很难相互沟通(至少我是这么认为的)。
当我尝试启动连接器时,我得到:
{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}
Run Code Online (Sandbox Code Playgroud)
作为回应。令人困惑的是我并不总是得到这个错误响应。有时它会起作用并且连接器会按预期启动。
从我读到的内容来看,这可能归结为我配置为 CONNECT_REST_ADVERTISED_HOST_NAME 的内容。
在 k8s 中运行时作为广告地址放置的正确值是多少?
BR
我计划使用 Amazon MSK,并且想将消费者日志转储到 S3 。但我没有看到任何选择。我是否需要编写自己的消费者,或者有没有办法直接将 Amazon MSK 消费者输出消费到 s3?
apache-kafka ×10
debezium ×2
kubernetes ×2
amazon-ecs ×1
aws-msk ×1
docker ×1
java ×1
mongodb ×1
mysql ×1
strimzi ×1