Mar*_*iya 5 jdbc apache-kafka apache-kafka-connect
我每秒收到很多消息(通过 http 协议)(50000 - 100000),并希望将它们保存到 PostgreSql。我决定使用 Kafka JDBC Sink 来实现此目的。
消息以一条记录保存到数据库,而不是批量保存。我想在 PostgreSQL 中批量插入大小为 500-1000 条记录的记录。
我找到了有关此问题的一些答案:How to use batch.size?
我尝试在配置中使用相关选项,但似乎没有任何效果。
我的 Kafka JDBC Sink PostgreSql 配置 ( etc/kafka-connect-jdbc/postgres.properties):
name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3
# The topics to consume from - required for sink connectors like this one
topics=jsonb_pkgs
connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false
insert.mode=insert
connection.user=postgres
table.name.format=${topic}
connection.password=pwd
batch.size=500
# based on 500*3000byte message size
fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000
Run Code Online (Sandbox Code Playgroud)
我还添加了选项connect-distributed.properties:
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
Run Code Online (Sandbox Code Playgroud)
虽然每个分区每秒获取超过 1000 条记录,但记录会被保存到 PostgreSQL 中。
编辑:消费者选项已添加到其他具有正确名称的文件中
我还添加了选项etc/schema-registry/connect-avro-standalone.properties:
# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000
Run Code Online (Sandbox Code Playgroud)
我意识到我误解了文档。记录被一一插入到数据库中。一笔事务中插入的记录数取决于batch.size和consumer.max.poll.records。我预计批量插入是以另一种方式实现的。我希望有一个选项来插入这样的记录:
INSERT INTO table1 (First, Last)
VALUES
('Fred', 'Smith'),
('John', 'Smith'),
('Michael', 'Smith'),
('Robert', 'Smith');
Run Code Online (Sandbox Code Playgroud)
但这似乎是不可能的。
| 归档时间: |
|
| 查看次数: |
2790 次 |
| 最近记录: |