我使用HTTP PUT方法编写了一个上传文件的服务.
Web浏览器不支持PUT,因此我需要一种测试方法.它可以很好地用作从浏览器访问它的POST.
更新:这是有效的.我尝试了海报,但它与使用提琴手一样.您必须知道如何构建请求.卷曲处理问题.
curl -X PUT"localhost:8080/urlstuffhere"-F"file = @ filename"-b"JSESSIONID = cookievalue"
我尝试使用以下配置启动 JDBC 接收器连接器:
{
"name": "crm_data-sink_hh",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 6,
"topics": "crm_account,crm_competitor,crm_event,crm_event_participation",
"connection.url": "jdbc:postgresql://db_host/hh?prepareThreshold=0",
"connection.user": "db_user",
"connection.password": "${file:db_hh_kafka_connect_pass}",
"dialect.name": "PostgreSqlDatabaseDialect",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "guid",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
"errors.deadletterqueue.context.headers.enable":true
}
}
Run Code Online (Sandbox Code Playgroud)
但是当连接器处于运行状态时没有任务正在运行:
curl -X GET http://kafka-connect:10900/connectors/crm_data-sink_hh/status
{"name":"crm_data-sink_hh","connector":{"state":"RUNNING","worker_id":"172.16.24.14:10900"},"tasks":[],"type":"sink"}
Run Code Online (Sandbox Code Playgroud)
我多次遇到这个问题,但我很困惑,因为它是随机发生的。我的问题和这个问题非常相似。我将不胜感激任何帮助!
更新。11/04/2019(不幸的是,现在我只有INFO级别的日志)
最后,经过几次尝试,我通过更新现有连接器的配置来启动连接器来运行任务crm_data-sink_db_hh:
$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}
$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}
$ curl -X PUT -d @new_config.json http://docker21:10900/connectors/crm_data-sink_db_hh/config -H 'Content-Type: application/json'
$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"UNASSIGNED","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}
$ curl …Run Code Online (Sandbox Code Playgroud) 我尝试使用errors.tolerance: all选项忽略接收器连接器中的错误消息。完整的连接器配置:
{
"name": "crm_data-sink_pandora",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 6,
"topics": "crm_account_detail,crm_account_on_competitors,crm_event,crm_event_participation",
"connection.url": "jdbc:postgresql://dburl/service?prepareThreshold=0",
"connection.user": "pandora.app",
"connection.password": "*******",
"dialect.name": "PostgreSqlDatabaseDialect",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "guid",
"table.name.format": "pandora.${topic}",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
"errors.deadletterqueue.context.headers.enable":true
}
}
Run Code Online (Sandbox Code Playgroud)
目标表DDL:
create table crm_event_participation
(
guid char(36) not null
constraint crm_event_participation_pkey
primary key,
created_on timestamp,
created_by_guid char(36),
modified_on timestamp,
modified_by_guid char(36),
process_listeners integer,
event_guid char(36),
event_response varchar(250),
note varchar(500),
is_from_group boolean,
contact_guid char(36),
target_item integer,
account_guid char(36),
employer_id integer
);
Run Code Online (Sandbox Code Playgroud)
连接器成功启动,但如果发生错误(例如缺少字段),则会失败。 …
我成功安装了Postgres Debezium CDC。现在,我能够捕获数据库发生的所有更改。但问题是“之前”字段始终为空。因此,如果我插入一条记录,(id = 1, name = Bill)我就会从 Kafka 获取以下数据:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
Run Code Online (Sandbox Code Playgroud)
但如果我像这样更新记录:
UPDATE mytable set name = 'Bob' WHERE id = 1
Run Code Online (Sandbox Code Playgroud)
我从卡夫卡那里得到这个:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bob'}, ...
Run Code Online (Sandbox Code Playgroud)
这就是我配置连接器的方式:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
Run Code Online (Sandbox Code Playgroud)
这是什么问题?我该如何解决?
"errors.deadletterqueue.topic.name"适用于源连接器吗?我使用JDBC 接收器连接器进行了测试,它可以工作,但我没有找到有序列化错误的记录进入死信队列。
我使用Debezium Connector for MongoDB,apache-kafka-connect版本是 2.4.0。
其余错误处理配置:
"errors.tolerance": "all",
"errors.log.enable": "false",
"errors.deadletterqueue.topic.name": "test-dlq",
"errors.deadletterqueue.context.headers.enable": "true"
Run Code Online (Sandbox Code Playgroud) 我计划使用Debezium来制作活动,Kafka并且我需要强有力的交付和订购保证。通过在生产者配置中使用enable.idempotence=true参数,我可以获得这些保证。
我的问题是:
我正在使用 a 从mysql数据库表中读取数据Kafka Source JDBC connector并将其发布到主题test-mysql-petai。
数据库表有 2 个字段,其中Id是主键:
+---------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(20) | YES | | NULL | |
+---------+-------------+------+-----+---------+----------------+
Run Code Online (Sandbox Code Playgroud)
我需要该字段的值id作为主题的键。我尝试向 jdbc 连接器属性添加转换。
JDBCConnector.属性:
name=jdbc-source-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/test?user=dins&password=pw&serverTimezone=UTC
table.whitelist=petai
mode=incrementing
incrementing.column.name=id
schema.pattern=""
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
topic.prefix=test-mysql-jdbc-
Run Code Online (Sandbox Code Playgroud)
但是,当我使用消费者读取键和值时,我得到以下信息:
Key = …Run Code Online (Sandbox Code Playgroud) 我使用 debezium cdc connect pg,我构建了 docker 使用的 pg 11?pg 运行良好。当我在 kafka 连接器中使用 debezium 时?它报告?
无法获得数据库测试的编码
卷曲是:
curl -H "Accept: application/json" -H "Content-type: application/json" -X POST http://localhost:8083/connectors/ -d '{
"name": "debezium",
"config": {
"name": "debezium",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "5432",
"database.dbname": "test",
"database.user": "pg",
"database.password": "135790",
"database.server.name": "ls",
"table.whitelist": "public.test",
"plugin.name": "pgoutput"
}
}'
Run Code Online (Sandbox Code Playgroud)
卡夫卡例外是:
[2020-07-08 09:24:35,076] ERROR Uncaught exception in REST call to /connectors/ (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.lang.RuntimeException: Couldn't obtain encoding for database test
at io.debezium.connector.postgresql.connection.PostgresConnection.determineDatabaseCharset(PostgresConnection.java:434)
at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:77)
at …Run Code Online (Sandbox Code Playgroud)