Art*_*nov 6 apache-kafka kafka-consumer-api kafka-python confluent-platform
我正在使用confluent-kafka-python,当我尝试连接到已关闭的代理时,发现它无限挂起。我似乎无法应用在文档中找到的任何超时设置:
from confluent_kafka import Consumer
conf = {'bootstrap.servers': f"{self.host}:{self.port}",
'group.id': "foo",
'auto.offset.reset': 'smallest',
'socket.timeout.ms':'2000', 'socket.max.fails':2,
'metadata.request.timeout.ms': 5000,
'reconnect.backoff.max.ms':'5000',
'api.version.request.timeout.ms':'5000',
#api.version.fallback.ms
'session.timeout.ms':'2000',
#heartbeat.interval.ms
'coordinator.query.interval.ms':'1000',
#max.poll.interval.ms
#auto.commit.interval.ms,
"debug":"generic, broker, topic, metadata",
}
try:
self.consumer = Consumer(conf)
Run Code Online (Sandbox Code Playgroud)
我在日志中得到:
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: broker in state TRY_CONNECT connecting
%7|1584702589.065|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1584702589.065|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connecting to ipv4#x.x.x.x:6667 (plaintext) with socket 11
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1584702589.066|CONNECT|rdkafka#consumer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1584702589.067|BROKERFAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1584702589.067|FAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connect to ipv4#x.x.x.x:6667 failed: Connection refused (after 1ms in state CONNECT)
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state CONNECT -> DOWN
%7|1584702589.067|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state DOWN -> INIT
Run Code Online (Sandbox Code Playgroud)
它就这样一直持续下去,甚至 Ctrl-C 也无法阻止它,我必须使用kill.
我以前使用过kafka-python,但在无法连接到代理时确实失败了。
confluent-kafka-python当经纪人不可用时,是否有一个设置可能会导致失败?
我看到过这个问题Kafka Streams Broker Connection Timeout Settings并想确定我的情况是否相同?
更新:另一个相关问题:在双宿主节点上,如果我将 advertized.listeners 配置为内部地址,并尝试从外部接口发送消息,我再次面临无限循环:
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1584727507.340|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "de-cn1:6667"
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Received CONNECT op
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Success)
%7|1584727507.340|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 0ms in state TRY_CONNECT)
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: broker in state TRY_CONNECT connecting
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.359|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Host resolution failure: (errno: Bad address)
%7|1584727507.360|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
Run Code Online (Sandbox Code Playgroud)
因此它无法解析内部名称de-cn1并且不会失败。同样的问题如何使其在此类错误上失败?
| 归档时间: |
|
| 查看次数: |
3770 次 |
| 最近记录: |