Cam*_*son 2 apache-kafka apache-kafka-connect
当我与控制台制作人一起传输此内容时
{"id":1337,"status":"example_topic_1 success"}
Run Code Online (Sandbox Code Playgroud)
我从我的文件流消费者那里得到这个
{id=1337, status=example_topic_1 success}
Run Code Online (Sandbox Code Playgroud)
这对我来说是一个主要问题,因为如果不假设引号曾经在哪里,就无法恢复原始 JSON 消息。如何将消息输出到文件,同时保留引号?
# sh bin/connect-standalone.sh \
> config/worker.properties \
> config/connect-file-sink-example_topic_1.properties
Run Code Online (Sandbox Code Playgroud)# sh bin/kafka-console-consumer.sh \
> --bootstrap-server kafka_broker:9092 \
> --topic example_topic_1
Run Code Online (Sandbox Code Playgroud)最后,我启动一个控制台生成器来发送消息,然后输入一条消息。
# sh bin/kafka-console-producer.sh \
> --broker-list kafka_broker:9092 \
> --topic example_topic_1
Run Code Online (Sandbox Code Playgroud)
从控制台消费者中,消息会正确弹出,并带有引号。
{"id":1337,"status":"example_topic_1 success"}
Run Code Online (Sandbox Code Playgroud)
但我从 FileStreamSink 消费者那里得到了这个:
{id=1337, status=example_topic_1 success}
Run Code Online (Sandbox Code Playgroud)offset.storage.file.filename=/tmp/example.offsets
bootstrap.servers=kafka_broker:9092
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Run Code Online (Sandbox Code Playgroud)
name=file-sink-example_topic_1
connector.class=FileStreamSink
tasks.max=1
file=/data/example_topic_1.txt
topics=example_topic_1
Run Code Online (Sandbox Code Playgroud)
由于您实际上并不想解析 JSON 数据,而只是将其作为文本块直接传递,因此您需要使用 StringConverter:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Run Code Online (Sandbox Code Playgroud)
本文详细解释了转换器的细微差别:https : //rmoff.net/2019/05/08/when-a-kafka-connect-converter-is-not-a-converter/ 。这显示了您尝试执行的操作的示例,尽管使用它kafkacat来代替控制台生产者/消费者。
| 归档时间: |
|
| 查看次数: |
1038 次 |
| 最近记录: |