Anu*_*ana 7 python-2.7 kafka-python kafka-producer-api
我正在尝试使用 python 2.7 使用 kafka-python 2.0.1 生成 Kafka 主题的消息(由于一些与工作场所相关的限制,无法使用 Python 3)
我在单独的环境中创建了一个类,并编译了该包并安装在虚拟环境中:
import json
from kafka import KafkaProducer
class KafkaSender(object):
def __init__(self):
self.producer = self.get_kafka_producer()
def get_kafka_producer(self):
return KafkaProducer(
bootstrap_servers=['locahost:9092'],
value_serializer=lambda x: json.dumps(x),
request_timeout_ms=2000,
)
def send(self, data):
self.producer.send("topicname", value=data)
Run Code Online (Sandbox Code Playgroud)
我的驱动程序代码是这样的:
from mypackage import KafkaSender
# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)
Run Code Online (Sandbox Code Playgroud)
场景1:
我运行这段代码,它运行得很好,没有错误,但消息没有推送到主题。我已经确认这一点,因为该主题中的偏移或滞后没有增加。此外,消费者端没有记录任何内容。
场景 2:
从方法中注释/删除 Kafka 生产者的初始化__init__。
我将发送行从 更改为
self.producer.send("topicname", value=data) ie self.get_kafka_producer().send("topicname", value=data),不是提前(在类初始化期间)而是在将消息发送到主题之前创建 kafka 生产者。当我运行代码时,它运行得很好。该消息已发布到该主题。
我使用场景 1 的目的是创建一个 Kafka 生产者一次并使用它多次,而不是每次我想发送消息时都创建 Kafka 生产者。这样,如果我需要发送数百万条消息,我最终可能会创建数百万个 Kafka 生产者对象。
您能帮我理解为什么 Kafka 生产者会这样做吗?
注意:如果我将 Kafka 代码和驱动程序代码写在同一个文件中,它就可以正常工作。仅当我在单独的包中编写 Kafka 代码、编译它并将其导入到我的另一个项目中时,它才起作用。
日志: https: //www.diffchecker.com/dTtm3u2a
更新 1:2020 年 5 月 9 日 17:20:
从问题描述中删除了 INFO 日志。我启用了调试级别,这是第一个场景和第二个场景之间的调试日志之间的差异
https://www.diffchecker.com/dTtm3u2a
更新 2:2020 年 5 月 9 日,21:28:
经过进一步调试和查看 python-kafka 源代码,我能够推断出在场景 1 中,kafka 发送器被强制关闭,而在场景 2 中,kafka 发送器被优雅地关闭。
def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
self._running = False
self._accumulator.close()
self.wakeup()
def force_close(self):
"""Closes the sender without sending out any pending messages."""
self._force_close = True
self.initiate_close()
Run Code Online (Sandbox Code Playgroud)
这取决于kafka生产者的close()方法是使用超时0(强制关闭发送者)还是不使用超时(在这种情况下超时取值float('inf')并调用发送者的优雅关闭)来调用。
Kafka生产者的close()方法是从__del__垃圾收集时调用的方法调用的。close(0)方法是从注册的方法调用的,当解释器终止atexit时调用该方法。
问题是为什么在场景 1 中解释器会终止?
| 归档时间: |
|
| 查看次数: |
4994 次 |
| 最近记录: |