Mik*_*ail 4 apache-kafka apache-kafka-connect confluent-platform
我创建了最简单的 kafka 接收器连接器配置,我使用的是 confluence 4.1.0:
{
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "test-type",
"tasks.max": "1",
"topics": "dialogs",
"name": "elasticsearch-sink",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"schema.ignore": "true"
}
Run Code Online (Sandbox Code Playgroud)
在主题中,我将消息保存为JSON
{ "topics": "resd"}
Run Code Online (Sandbox Code Playgroud)
但在结果中我得到一个错误:
引起:org.apache.kafka.common.errors.SerializationException:反序列化 id -1 的 Avro 消息时出错 引起:org.apache.kafka.common.errors.SerializationException:未知的魔术字节!
正如 cricket_007 所说,如果您的数据采用 Json 反序列化器,您需要告诉 Connect 使用 Json 反序列化器。将其添加到您的连接器配置中:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2254 次 |
| 最近记录: |