kafka-python:以 0 vs inf 秒超时关闭 kafka 生产者

Anu*_*ana 7 python-2.7 kafka-python kafka-producer-api

我正在尝试使用 python 2.7 使用 kafka-python 2.0.1 生成 Kafka 主题的消息(由于一些与工作场所相关的限制,无法使用 Python 3)

我在单独的环境中创建了一个类,并编译了该包并安装在虚拟环境中:

import json
from kafka import KafkaProducer


class KafkaSender(object):
    def __init__(self):
        self.producer = self.get_kafka_producer()

    def get_kafka_producer(self):
        return KafkaProducer(
            bootstrap_servers=['locahost:9092'],
            value_serializer=lambda x: json.dumps(x),
            request_timeout_ms=2000,
        )

    def send(self, data):
        self.producer.send("topicname", value=data)
Run Code Online (Sandbox Code Playgroud)

我的驱动程序代码是这样的:

from mypackage import KafkaSender

# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)
Run Code Online (Sandbox Code Playgroud)

场景1:
我运行这段代码,它运行得很好,没有错误,但消息没有推送到主题。我已经确认这一点,因为该主题中的偏移或滞后没有增加。此外,消费者端没有记录任何内容。

场景 2:
从方法中注释/删除 Kafka 生产者的初始化__init__
我将发送行从 更改为 self.producer.send("topicname", value=data) ie self.get_kafka_producer().send("topicname", value=data),不是提前(在类初始化期间)而是在将消息发送到主题之前创建 kafka 生产者。当我运行代码时,它运行得很好。该消息已发布到该主题。

我使用场景 1 的目的是创建一个 Kafka 生产者一次并使用它多次,而不是每次我想发送消息时都创建 Kafka 生产者。这样,如果我需要发送数百万条消息,我最终可能会创建数百万个 Kafka 生产者对象。

您能帮我理解为什么 Kafka 生产者会这样做吗?

注意:如果我将 Kafka 代码和驱动程序代码写在同一个文件中,它就可以正常工作。仅当我在单独的包中编写 Kafka 代码、编译它并将其导入到我的另一个项目中时,它才起作用。

日志: https: //www.diffchecker.com/dTtm3u2a

更新 1:2020 年 5 月 9 日 17:20:
从问题描述中删除了 INFO 日志。我启用了调试级别,这是第一个场景和第二个场景之间的调试日志之间的差异

https://www.diffchecker.com/dTtm3u2a

更新 2:2020 年 5 月 9 日,21:28:
经过进一步调试和查看 python-kafka 源代码,我能够推断出在场景 1 中,kafka 发送器被强制关闭,而在场景 2 中,kafka 发送器被优雅地关闭。

    def initiate_close(self):
        """Start closing the sender (won't complete until all data is sent)."""
        self._running = False
        self._accumulator.close()
        self.wakeup()

    def force_close(self):
        """Closes the sender without sending out any pending messages."""
        self._force_close = True
        self.initiate_close()

Run Code Online (Sandbox Code Playgroud)

这取决于kafka生产者的close()方法是使用超时0(强制关闭发送者)还是不使用超时(在这种情况下超时取值float('inf')并调用发送者的优雅关闭)来调用。

Kafka生产者的close()方法是从__del__垃圾收集时调用的方法调用的。close(0)方法是从注册的方法调用的,当解释器终止atexit时调用该方法。 问题是为什么在场景 1 中解释器会终止?