confluence-kafka-python:当 Broker 不可用时如何使初始连接超时?

Art*_*nov 6 apache-kafka kafka-consumer-api kafka-python confluent-platform

我正在使用confluent-kafka-python,当我尝试连接到已关闭的代理时,发现它无限挂起。我似乎无法应用在文档中找到的任何超时设置:

from confluent_kafka import Consumer

conf = {'bootstrap.servers': f"{self.host}:{self.port}",
           'group.id': "foo",
           'auto.offset.reset': 'smallest',
           'socket.timeout.ms':'2000', 'socket.max.fails':2,
           'metadata.request.timeout.ms': 5000,
           'reconnect.backoff.max.ms':'5000',
           'api.version.request.timeout.ms':'5000',
           #api.version.fallback.ms
           'session.timeout.ms':'2000',
           #heartbeat.interval.ms
           'coordinator.query.interval.ms':'1000',
           #max.poll.interval.ms
           #auto.commit.interval.ms,
           "debug":"generic, broker, topic, metadata",

   }

try:
     self.consumer = Consumer(conf)
Run Code Online (Sandbox Code Playgroud)

我在日志中得到:

%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: broker in state TRY_CONNECT connecting
%7|1584702589.065|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1584702589.065|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connecting to ipv4#x.x.x.x:6667 (plaintext) with socket 11
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1584702589.066|CONNECT|rdkafka#consumer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1584702589.067|BROKERFAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1584702589.067|FAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connect to ipv4#x.x.x.x:6667 failed: Connection refused (after 1ms in state CONNECT) 
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state CONNECT -> DOWN
%7|1584702589.067|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state DOWN -> INIT

Run Code Online (Sandbox Code Playgroud)

它就这样一直持续下去,甚至 Ctrl-C 也无法阻止它,我必须使用kill.

我以前使用过kafka-python,但在无法连接到代理时确实失败了。

confluent-kafka-python当经纪人不可用时,是否有一个设置可能会导致失败?

我看到过这个问题Kafka Streams Broker Connection Timeout Settings并想确定我的情况是否相同?

更新:另一个相关问题:在双宿主节点上,如果我将 advertized.listeners 配置为内部地址,并尝试从外部接口发送消息,我再次面临无限循环:

%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1584727507.340|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "de-cn1:6667"
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Received CONNECT op
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Success)
%7|1584727507.340|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 0ms in state TRY_CONNECT)
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: broker in state TRY_CONNECT connecting
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.359|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Host resolution failure: (errno: Bad address)
%7|1584727507.360|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
Run Code Online (Sandbox Code Playgroud)

因此它无法解析内部名称de-cn1并且不会失败。同样的问题如何使其在此类错误上失败?