我试图弄清楚如何最初从查询中获取所有数据,然后仅使用 kafka 连接器增量更改。这样做的原因是我想将所有数据加载到弹性搜索中,然后使 es 与我的 kafka 流同步。目前,我首先使用带有模式 = 批量的连接器来执行此操作,然后将其更改为时间戳。这工作正常。
但是,如果我们想将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本来以某种方式清理或删除 kafka 流和 es 索引数据,修改连接 ini 以将模式设置为批量,重新启动所有内容,给出是时候加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动所有内容(需要这样一个脚本的原因是偶尔,批量更新会通过我们尚无法控制的 etl 过程来纠正历史数据,并且此过程不会更新时间戳)
有没有人做类似的事情并找到了更优雅的解决方案?
elasticsearch apache-kafka apache-kafka-connect confluent-platform
我正在使用 Confluent JDBCSourceConnector 从 Oracle 表中读取数据。我正在尝试使用 SMT 生成由 3 个连接字段组成的密钥。
transforms=createKey
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=BUS_OFC_ID_CD,SO_TYPE,SO_NO
Run Code Online (Sandbox Code Playgroud)
使用上面的转换,我得到了这样的东西:
{"BUS_OFC_ID_CD":"111","SO_TYPE":"I","SO_NO":"55555"}
Run Code Online (Sandbox Code Playgroud)
我想要类似的东西:
111I55555
Run Code Online (Sandbox Code Playgroud)
关于如何仅连接值的任何想法?
oracle jdbc apache-kafka apache-kafka-connect confluent-platform
我想知道 kafka 中消息的压缩大小。
我使用 kafka 1.1.0 和 java kafka-connect 1.1.0 将消息从我的生产者发送到主题。
如果消息对我的制作人来说太大,我会收到
该消息在序列化时为 xxx 字节,大于您使用 max.request.size 配置配置的最大请求大小。
将 max.request.size 设置为合适的值会导致来自代理的错误消息,因为 message.max.bytes 也必须在代理配置中进行相应调整。不幸的是,错误消息不包括代理收到的消息的大小。我调整了 message.max.bytes。到现在为止还挺好。
如果我在生产者端激活压缩,max.request.size 仍然必须与没有压缩的大小相同,因为不幸的是,代码在压缩之前比较了未压缩消息的大小(请参阅https://issues.apache .org/jira/browse/KAFKA-4169 )
但是通过压缩,我将能够减少代理中的 message.max.bytes。问题是我在任何时候都无法确定此压缩消息的大小。有没有办法在发送消息之前或稍后在日志文件中在生产者代码中弄清楚这一点?
在我使用压缩的情况下,message.max.bytes 的默认值 1MB 就足够了,所以我不必更改默认配置。但我想知道我的压缩消息是远低于 1MB 还是只有 0.99MB。在这种情况下,我可能会在生产中增加 message.max.bytes 以避免出现问题。
提前感谢您的支持。
我需要从具有约 2000 个模式的 PostgreSQL 数据库中获取数据。所有模式都包含相同的表(它是一个多租户应用程序)。
连接器配置如下:
{
"name": "postgres-source",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "updated",
"incrementing.column.name": "id",
"connection.password": "********",
"tasks.max": "1",
"mode": "timestamp+incrementing",
"topic.prefix": "postgres-source-",
"connection.user": "*********",
"poll.interval.ms": "3600000",
"numeric.mapping": "best_fit",
"connection.url": "jdbc:postgresql://*******:5432/*******",
"table.whitelist": "table1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false"
}
Run Code Online (Sandbox Code Playgroud)
使用此配置,我收到此错误:
“连接器使用非限定表名作为主题名,并检测到重复的非限定表名。这可能导致主题中的混合数据类型和下游处理错误。为防止此类处理错误,JDBC Source 连接器在执行时无法启动检测重复的表名配置”
显然,连接器不想将多个同名表中的数据发布到单个主题。
这对我来说无关紧要,它可以转到单个主题或多个主题(每个模式一个)。
作为附加信息,如果我添加:
"schema.pattern": "schema1"
Run Code Online (Sandbox Code Playgroud)
到配置,连接器工作并且来自指定模式和表的数据被复制。
有没有办法复制包含同名表的多个模式?
谢谢
postgresql jdbc apache-kafka apache-kafka-connect confluent-platform
我想使用 Kafka Connector 将数据从 Kafka 流式传输到 MongoDB。我找到了这个https://github.com/hpgrahsl/kafka-connect-mongodb。但是没有步骤可做。
谷歌搜索后,它似乎导致了我不想使用的Confluent Platform。
任何人都可以分享我的文档/指南,如何在不使用 Confluent 平台或其他 Kafka 连接器的情况下使用kafka-connect-mongodb将数据从 Kafka 流式传输到 MongoDB?
先感谢您。
我试过的
Step1:我mongo-kafka-connect-0.1-all.jar从maven central下载
Step2:将jar文件复制到一个新文件夹plugins里面kafka(我在windows上用的是Kafka,所以目录是D:\git\1.libraries\kafka_2.12-2.2.0\plugins)
步骤 3:connect-standalone.properties通过添加新行来
编辑文件plugin.path=/git/1.libraries/kafka_2.12-2.2.0/plugins
步骤 4:我为 mongoDB 接收器添加新的配置文件 MongoSinkConnector.properties
name=mongo-sink
topics=test
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true
# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017,mongo1:27017,mongo2:27017,mongo3:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect
Run Code Online (Sandbox Code Playgroud)
第五步:运行命令 bin\windows\connect-standalone.bat config\connect-standalone.properties config\MongoSinkConnector.properties
但是,我得到了错误
[2019-07-09 10:19:09,466] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't …Run Code Online (Sandbox Code Playgroud)
我试图设置 Kafka Connect 以运行 ElasticsearchSinkConnector。
Kafka 设置,由 3 个使用 Kerberos、SSL 和 ACL 保护的代理组成。
到目前为止,我一直在尝试使用 docker/docker-compose(Confluent docker-image 5.4 with Kafka 2.4)连接到远程 kafka 安装(Kafka 2.0.1 - 实际上是我们的生产环境)运行连接框架和 elasticserch-server 本地)。
KAFKA_OPTS: -Djava.security.krb5.conf=/etc/kafka-connect/secrets/krb5.conf
CONNECT_BOOTSTRAP_SERVERS: srv-kafka-1.XXX.com:9093,srv-kafka-2.XXX.com:9093,srv-kafka-3.XXX.com:9093
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: user-grp
CONNECT_CONFIG_STORAGE_TOPIC: test.internal.connect.configs
CONNECT_OFFSET_STORAGE_TOPIC: test.internal.connect.offsets
CONNECT_STATUS_STORAGE_TOPIC: test.internal.connect.status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: srv-kafka-1.XXX.com:2181,srv-kafka-2.XXX.com:2181,srv-kafka-3.XXX.com:2181
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_KERBEROS_SERVICE_NAME: "kafka"
CONNECT_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/kafka-connect/secrets/kafka-connect.keytab" \
principal="<principal>;
CONNECT_SASL_MECHANISM: GSSAPI
CONNECT_SSL_TRUSTSTORE_LOCATION: <path_to_truststore.jks>
CONNECT_SSL_TRUSTSTORE_PASSWORD: <PWD> …Run Code Online (Sandbox Code Playgroud) I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:
{
"data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"A"
}
},
"beforeData":{
"Data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"B"
}
}
},
"headers":{
"operation":"UPDATE",
"timestamp":"2018-05-03T13:53:43.000"
}
}
Run Code Online (Sandbox Code Playgroud)
What configuration is needed in the sink file in order to extract all the (sub)fields under data and headers and ignore those under beforeData so that the target table in which the …
我正在尝试在docker的帮助下使用kafka debezium(Kafka流)将一个数据库的表数据下沉到另一个数据库。数据库流工作正常。但流式数据接收另一个 MySQL DB 进程时出现错误。
对于我的连接器接收器配置如下。
{
"name": "mysql_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "mysql-connect.kafka_test.employee",
"connection.url": "jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"errors.tolerance": "all",
"errors.log.enable":"true",
"errors.log.include.messages":"true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"name": "mysql_sink"
}
}
Run Code Online (Sandbox Code Playgroud)
但我收到错误。
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx
io.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:59)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:52)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: java.sql.SQLException: No …Run Code Online (Sandbox Code Playgroud) 如何使用 Kafka Connect 从 kafka 消息中检索传入标头,以使用 MongoDB Sink Connector 到 mongodb 将它们存储为附加数据字段。
我有一个卡夫卡主题“PROJECT_EXAMPLE_TOPIC”。如您所见,我已经能够保存消息时间戳、传入消息数据和 mongo 文档创建/更新日期。
我猜想有一个函数可以在某处提取标头。
卡夫卡值示例
// incoming kafka value
{
"msgId" : "exampleId"
}
Run Code Online (Sandbox Code Playgroud)
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_header_foo" : "header_foo_value"
}
Run Code Online (Sandbox Code Playgroud)
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" …Run Code Online (Sandbox Code Playgroud) 我使用 postgres 配置了 debezium,如下代码所示,当我启动 Spring Boot 应用程序时出现错误Creation of replication slot failed
@Bean
public io.debezium.config.Configuration connector() {
return io.debezium.config.Configuration.create()
.with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
.with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class)
.with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "offset.dat")
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 60000)
.with("name", "test-postgres-connector")
.with(RelationalDatabaseConnectorConfig.SERVER_NAME, "172.26.113.123:5455-test")
.with(RelationalDatabaseConnectorConfig.HOSTNAME, "172.26.113.123")
.with(RelationalDatabaseConnectorConfig.PORT, "5455")
.with(RelationalDatabaseConnectorConfig.USER, "test")
.with(RelationalDatabaseConnectorConfig.PASSWORD, "test")
.with(RelationalDatabaseConnectorConfig.DATABASE_NAME, "test")
.with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, "question").build();
}
2021-04-02 12:58:56.817 ERROR 14672 --- [pool-5-thread-1] io.debezium.embedded.EmbeddedEngine : Unable to initialize and start connector's task class 'io.debezium.connector.postgresql.PostgresConnectorTask' with config: {name=student-postgres-connector, connector.class=io.debezium.connector.postgresql.PostgresConnector, database.port=5455, offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore, table.include.list=question, database.user=pfe, database.hostname=172.26.113.123, offset.storage.file.filename=offset.dat, database.password=********, offset.flush.interval.ms=60000, database.server.name=172.26.113.123:5455-pfe, database.dbname=pfe}
io.debezium.DebeziumException: Creation of replication slot …Run Code Online (Sandbox Code Playgroud)