Abh*_*k B 0 apache-kafka apache-kafka-connect
我正在尝试使用 Kafka connect 使用独立模式写入数据。我将数据写入的主题是具有多个分区。但是,数据仅写入其中一个分区。当我启动多个消费者控制台时,数据仅打印到其中一个。另一个消费者控制台只有在第一个控制台关闭后才能获取任何数据。我无法弄清楚需要在配置文件中进行哪些更改才能使其写入多个分区。
这是standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1000
rest.port=8084
Run Code Online (Sandbox Code Playgroud)
连接文件源.properties:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test4.txt
topic=consumer_group
Run Code Online (Sandbox Code Playgroud)
现在我使用以下命令来运行连接器:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
Run Code Online (Sandbox Code Playgroud)
使用以下命令启动消费者控制台:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer_group --from-beginning --consumer-property group.id=new-consumer-group
Run Code Online (Sandbox Code Playgroud)
它只将数据打印到消费者控制台之一。但是,如果我使用生产者控制台而不是 Kafka 连接来编写消息,那么我可以看到多个消费者上的消息(以循环方式),应该是这样。但是使用Kafka connect,它只是将所有数据写入单个分区,同一组中的其他消费者必须闲置。需要更改什么才能写入循环系统中的所有分区?
此答案适用于 Apache Kafka 0.10.2.1,但不一定适用于未来版本。
您可能知道,文件源连接器生成带有null键和null主题分区号的消息。这意味着由 Kafka Connect 的生产者使用它的partitioner分配主题分区,对于具有空键的消息,默认分区器将尝试将消息轮询到可用分区。
但是,您遇到了 JSON 转换器的一个怪癖,它是standalone.properties通过key.converter和value.converter属性在文件中配置的:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Run Code Online (Sandbox Code Playgroud)
当 JSON 转换器被配置为启用模式时,JSON 表示会在值周围包含一个信封,以便键或值包含模式和有效负载:
{
"schema": ...,
"payload": ...
}
Run Code Online (Sandbox Code Playgroud)
您的standalone.properties文件将密钥的转换器配置为启用了模式,因此即使连接器生成带有null密钥和null模式的消息,JSON 转换器(启用了模式)也始终将它们包装在一个信封中。因此,每条消息的密钥将是:
{
"schema": null,
"payload": null
}
Run Code Online (Sandbox Code Playgroud)
生产者的默认分区器将始终将这些相同的键散列到同一个分区。
要更改行为,请编辑您的standalone.properties文件并将key.converter.schemas.enable属性更改为false:
key.converter.schemas.enable=false
Run Code Online (Sandbox Code Playgroud)
您可以选择将value.converter.schemas.enable属性更改false为更改值的写入方式,以不将值包装在信封中并包含架构:
value.converter.schemas.enable=false
Run Code Online (Sandbox Code Playgroud)
这也影响了转换器如何处理空值,当具有特定键的源实体被删除时,某些连接器会生成空值。例如,某些变更数据捕获连接器会在从源数据库中删除一行时执行此操作。这适用于日志压缩主题,因为每条消息都代表键控实体的最后一个已知状态,并且因为空值对应于一个墓碑记录,告诉 Kafka 在该墓碑之前具有相同键的所有消息都可以从日志。但是,如果将值转换器配置为启用模式的 JSON 转换器将永远不会输出null消息值,因此日志压缩永远不会删除墓碑消息。这是一个小问题,但需要注意。
如果您想在 JSON 中编码您的键和值,那么您可能不需要或不需要模式,因此可以schemas.enable为它们的键和值 JSON 转换器打开。
对于那些真正使用模式的人,可以考虑使用Confluent 的模式注册表和 Avro 转换器。不仅编码的消息明显更小(由于 Avro 编码而不是 JSON 字符串编码),编码的消息还包括 Avro 模式的 ID,因此允许您随着时间的推移改进您的消息模式,而无需协调升级您的生产者和消费者使用完全相同的模式。各种优点都有!
| 归档时间: |
|
| 查看次数: |
1819 次 |
| 最近记录: |