调试自定义Kafka连接器的简单有效方法是什么?

C. *_*men 8 java debugging slf4j apache-kafka apache-kafka-connect

我正在使用几个Kafka连接器,我在控制台输出中没有看到它们的创建/部署中的任何错误,但是我没有得到我正在寻找的结果(没有任何结果,无论如何,期望或除此以外).我根据Kafka的示例FileStream连接器制作了这些连接器,因此我的调试技术基于使用示例中使用的SLF4J Logger.我已经搜索了我认为会在控制台输出中生成的日志消息,但无济于事.我在错误的地方找这些消息吗?或者是否有更好的方法来调试这些连接器?

我为实现引用的SLF4J Logger的示例用法:

Kafka FileStreamSinkTask

Kafka FileStreamSourceTask

Kon*_*sis 21

我将尝试以广泛的方式回答您的问题.一个简单的连接器开发方法如下:

  • 通过查看公开提供的众多Kafka连接器之一来构建和构建连接器源代码(您可以在此处找到一个广泛的列表:https://www.confluent.io/product/connectors/)
  • https://www.confluent.io/download/下载最新的Confluent Open Source版本(> = 3.3.0)
  • 通过以下方式之一将您的连接器包提供给Kafka Connect:

    1. 将所有连接器jar文件(连接器jar和依赖jar,不包括Connect API jar)存储到文件系统中的某个位置,并通过将此位置添加到plugin.pathConnect worker属性中的属性来启用插件隔离 .例如,如果存储了连接器jar /opt/connectors/my-first-connector,则将设置plugin.path=/opt/connectors工作者的属性(参见下文).
    2. 将所有连接器jar文件存储在下面的文件夹中${CONFLUENT_HOME}/share/java.例如:${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector.(需要以kafka-connect-启动脚本拾取的前缀开头).$ CONFLUENT_HOME是您安装Confluent Platform的地方.
  • (可选)通过更改Connect in 或甚${CONFLUENT_HOME}/etc/kafka/connect-log4j.properties至更改日志级别来增加日志记录.DEBUGTRACE

  • 使用Confluent CLI启动所有服务,包括Kafka Connect.详情请访问:http://docs.confluent.io/current/connect/quickstart.html

    简述: confluent start

注意:CLI当前加载的Connect工作者属性文件是${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties.如果您选择启用类加载隔离,但是如果您需要更改Connect worker的属性,那么您应该编辑该文件.

  • 运行Connect worker后,运行以下命令启动连接器:

    confluent load <connector_name> -d <connector_config.properties>

    要么

    confluent load <connector_name> -d <connector_config.json>

    连接器配置可以是java属性或JSON格式.

  • 运行 confluent log connect以打开Connect worker的日志文件,或直接导航到运行日志和数据的位置

    cd "$( confluent current )"

注意:通过CONFLUENT_CURRENT适当地设置环境变量,在Confluent CLI会话期间更改日志和数据的存储位置.例如,如果/opt/confluent存在并且您要存储数据,请运行:

export CONFLUENT_CURRENT=/opt/confluent
confluent current

  • 最后,要以交互方式调试连接器,可能的方法是在开始使用Confluent CLI连接之前应用以下内容:

    confluent stop connect
    export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
    confluent start connect

    然后连接调试器(例如远程连接到Connect工作器(默认端口:5005).要在调试模式下停止运行连接,只需运行:unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;完成后.

我希望上面的内容能让您的连接器开发更轻松......更有趣!

  • `export KAFKA_DEBUG=y;` 设置 KAFKA_DEBUG 对我有用,而不是 CONNECT_DEBUG。 (2认同)