我目前正在使用这个lib来测试我设置的kafka服务器:https://github.com/dsully/pykafka
import kafka
import time
def test_kafka_server(n=1):
for i in range(0,n):
producer = kafka.producer.Producer('test',host='10.137.8.192')
message = kafka.message.Message(str(time.time()))
producer.send(message)
producer.disconnect()
def main():
test_kafka_server(100000)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
最终发生的事情是我最终超载了我自己的本地机器.
我得到错误10055,根据谷歌意味着"Windows已经用完了TCP/IP套接字缓冲区,因为有太多连接一次打开." 根据netstat,producer.disconnect()没有关闭套接字,而是将其置于某个TIME_WAIT状态.
ipython调试器指向这一行:
C:\Python27\lib\socket.pyc in meth(name, self, *args)
222 proto = property(lambda self: self._sock.proto, doc="the socket protocol")
223
--> 224 def meth(name,self,*args):
225 return getattr(self._sock,name)(*args)
226
Run Code Online (Sandbox Code Playgroud)
作为罪魁祸首,但这似乎陷入了低于我感到满意的事情.
我搜索过,发现这个Python套接字没有正确关闭连接,建议这样做:
setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Run Code Online (Sandbox Code Playgroud)
所以,我在io.py文件中使用该选项重建了pykafka lib:
def connect(self):
""" Connect to the Kafka server. """ …Run Code Online (Sandbox Code Playgroud) 我有一个简单的服务器/客户端.我使用netcat作为客户端来测试服务器.如果我在客户端退出之前停止服务器,我将无法再启动服务器一段时间,我发现此错误:"[Errno 98]地址已在使用中"
但如果我先关闭客户端,然后服务器停止,我将不会有这个问题.
我的服务器套接字如下:
try:
s=socket
s.bind(..)
s.listen(1)
conn,addr=s.accept()
finally:
conn.close()
s.close()
Run Code Online (Sandbox Code Playgroud)
我觉得服务器没有正确关闭套接字.但我不知道如何解决这个问题.