Ark*_*man 4 apache-kafka apache-kafka-connect confluent-platform
我在尝试将消息从 Kafka 读取到 S3 时遇到了将 jar 添加到 Kafka 连接类路径的问题。
目标是根据时间戳在分区中写入消息,时间戳是 Kafka 消息中 Key 的一部分。
为了使故事简短,我必须提供自定义时间戳提取器。按照此处的文档创建了一个实现TimestampExtractor接口的类并将 JAR 位置添加到plugin.path属性中。
问题是当我开始连接时,找不到类。不知何故,jar 不在类路径中,我得到了
org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor
Run Code Online (Sandbox Code Playgroud)
附加数据:
版本:融合 4.0.0
连接:连接独立
启动命令:
sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \
/home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \
/home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties
Apreaciate任何帮助。
要使自定义时间戳提取器类可用于您的 S3 连接器,您将需要以下内容:
添加带有自定义类的 jar 以及其他连接器的依赖项。例子:
下保存./share/java/kafka-connect-s3,如果你想这是只有在S3中的连接器,或
./share/java/kafka-connect-storage-common将其提供给所有的存储片连接器(S3目前和HDFS连接器)。
io.confluent.connect.storage.partitioner.TimestampExtractor接口。当您timestamp.extractor在连接器的配置中设置属性时,请使用完全限定的类名,当然要确保它与您在 jar 中定义和打包的包相匹配。例如:
timestamp.extractor=me.connectors.MyTimestampExtractor
最后,您将按照类似的过程使自定义分区程序可用于您的连接器。
| 归档时间: |
|
| 查看次数: |
991 次 |
| 最近记录: |