小编Yuv*_*sty的帖子

如何使用 python kafka 库处理 kafka 的连接问题?

我有一个无服务器功能,正在尝试向卡夫卡发送一些数据。有时它可以工作,有时连接会断开并且数据丢失。

原因是 kafka 库没有引发异常,而是添加错误日志。所以我无法将我的代码添加到try:except.

这是我的日志中经常遇到的错误:

<BrokerConnection node_id=11 host=... port=9092>: Error receiving network data closing socket
Traceback (most recent call last):
File "/var/task/kafka/conn.py", line 745, in _recv
data = self._sock.recv(SOCK_CHUNK_BYTES)
ConnectionResetError: [Errno 104] Connection reset by peer
Run Code Online (Sandbox Code Playgroud)

上述函数_recv定义如下:

我仍在寻找解决方案,但在 try: except 中添加代码不起作用。

def _recv(self):
        responses = []
        SOCK_CHUNK_BYTES = 4096
        while True:
            try:
                data = self._sock.recv(SOCK_CHUNK_BYTES)
                # We expect socket.recv to raise an exception if there is not
                # enough data to read the full bytes_to_read
                # …
Run Code Online (Sandbox Code Playgroud)

python-3.x apache-kafka kafka-python kafka-producer-api

5
推荐指数
0
解决办法
1598
查看次数