Kafka with python:如何将主题发送到 postgreSQL?

Dim*_*hon 5 python postgresql python-3.x apache-kafka

我被敦促将 Kafka 与 python 一起使用。此外,我需要开发一个非常简单的生产者-消费者应用程序,该应用程序实时从设备读取指标,然后将它们发布到 Kafka 中的主题“指标”。然后,消费者必须订阅“metrics”主题并将这些数据存储到 postgreSQL 数据库中。

我尝试在这里绘制架构:

           +-----------+        Fetch metrics every 1 second          +--------------+                                           
           |Biometric  |     {heartrate, oxygen level, temprature}     |              |                                           
           |generation ------------------------------------------------  producer.py |                                           
           |device     |                                              |              |                                           
           +-----------+                                              +-------|------+                                           
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              |Publish metrics in "metrics" topic, every 1 second
                                                                              |{heartrate, oxygen level, tempature}              
                                                                              |         JSON format                              
                                                                              |                                                  
                                                                              |                                                  
                                                                      +-------|------+                                           
                                                                      |              |                                           
                                                                      |    KAFKA     |                                           
                                                                      |              |                                           
                                                                      +-------|------+                                           
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              | Subscribe to "metrics" topic and fetch           
                 -                                                            | the JSON every 1 second                          
                                                                              |                                                  
          +-------------+                                              +------|------+                                           
          |             |            Send data to postgreSQL           |             |                                           
          | postgreSQL  ------------------------------------------------ consumer.py |                                           
          |             |                                              |             |                                           
          +-------------+                                              +-------------+                                           
Run Code Online (Sandbox Code Playgroud)

现在,这就是我(卡夫卡零经验)对这个应用程序的想象。我设法把一切都交给消费者。

现在对我来说连接到 postgreSQL 数据库并向其发送这些数据非常容易。但我很困惑。我到处都读到,与此类数据库的连接必须通过 Kafka 连接器(?)进行。手动将消费者收到的数据发送到 postgres 是错误的吗?为什么我要在这里使用“Kafka 连接器”?最后,我不知道有任何 python kafka 连接器,这对我来说更加复杂。

有人可以帮我解决问题吗?

小智 5

如果你想以 JSON 格式将数据推送到 kafka,我最近在这里写了一个简单的例子。

您还可以找到kafka python 文档

对于 Kafka -> PostgreSQL 连接,您可能需要使用Kafka Connect JDBC接收器。Kafka Connect 是一系列预构建的连接器,允许您只需编写配置文件即可从 Kafka 推送或拉取(在 kafka 连接术语中为源或汇)数据,而无需一遍又一遍地编码或重新发明轮子。Kafka connect 不依赖于语言,因为您所需要做的就是将其部署在 Kafka 环境中并正确设置配置文件。

请注意,如果您打算使用 Kafka connect 将数据推送到 PostgreSQL,您可能需要

  • 创建 AVRO 格式的源流
  • 将架构规范添加到您的 JSON 消息中(更多信息请参见此处