带有自定义时间戳的 Kafka Connect.extractor

Ark*_*man 4 apache-kafka apache-kafka-connect confluent-platform

我在尝试将消息从 Kafka 读取到 S3 时遇到了将 jar 添加到 Kafka 连接类路径的问题。

目标是根据时间戳在分区中写入消息,时间戳是 Kafka 消息中 Key 的一部分。

为了使故事简短,我必须提供自定义时间戳提取器。按照此处的文档创建了一个实现TimestampExtractor接口的类并将 JAR 位置添加到plugin.path属性中。

问题是当我开始连接时,找不到类。不知何故,jar 不在类路径中,我得到了

org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor
Run Code Online (Sandbox Code Playgroud)

附加数据:

版本:融合 4.0.0

连接:连接独立

启动命令:

sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties

Apreaciate任何帮助。

Kon*_*sis 5

要使自定义时间戳提取器类可用于您的 S3 连接器,您将需要以下内容:

  • 添加带有自定义类的 jar 以及其他连接器的依赖项。例子:

    下保存./share/java/kafka-connect-s3,如果你想这是只有在S3中的连接器,或 ./share/java/kafka-connect-storage-common将其提供给所有的存储片连接器(S3目前和HDFS连接器)。

  • 确保您的自定义类实现了该io.confluent.connect.storage.partitioner.TimestampExtractor接口。
  • 当您timestamp.extractor在连接器的配置中设置属性时,请使用完全限定的类名,当然要确保它与您在 jar 中定义和打包的包相匹配。例如:

    timestamp.extractor=me.connectors.MyTimestampExtractor

最后,您将按照类似的过程使自定义分区程序可用于您的连接器。