Bey*_*Sam 11 python sockets client-server python-3.4 python-asyncio
我想在app.py上创建两个带有asyncio的协议(TcpClient和UdpServer),其中TcpClient将与server.py和UdpServer作为UDP服务器建立持久连接:
我需要的是:
a)两种通信协议:相互调用方法.这只是在第一个连接上工作.如果TcpClient重新连接,则无法再次发送字符串"send to tcp".来自UdpServer.我检查print(self)并且TcpClient创建一个新实例,旧的仍然存在但没有连接,但我不知道如何重构.我认为我以错误的方式使用asyncio.
b)当TcpClient与server.py断开连接时,等待5s并再次尝试重新连接,依此类推.我试图使用call_later()asyncio,但我认为有一种本地方式来做到这一点,而不是一种技巧.
c)当我启动app.py并且如果TcpClient无法连接时,我想尝试在5s后重新连接,依此类推.我不知道该怎么做.
这是我的app.py server.py的示例测试.server.py仅用于测试 - 这将是另一种语言.
只是说我尝试了什么:
1)当我启动app.py并且server.py关闭时,app.py不会重试.
2)当app.py连接到server.py并且服务器关闭并快速启动时,TcpClient重新连接,但我无法在新实例上相互连接方法并发送字符串"send to tcp".到server.py,只是旧的,没有更多的连接.
3)如果我用asyncio.async(),而不是run_until_complete()我不能打电话相互协议的方法.
我把app.py和server.py放在这里,所以你可以复制并运行测试.
我ncat localhost 9000 -u -v用来发送字符串"send to tcp.".此字符串需要打印在UdpServer类上,并传递给TcpClient类上的方法send_data_to_tcp,此方法将字符串发送到server.py.< - 首次重新连接tcpClient后,这不起作用.
我正在使用Python 3.4.0.
非常感谢.
import asyncio
#TCP client
class TcpClient(asyncio.Protocol):
message = 'Testing'
def connection_made(self, transport):
self.transport = transport
self.transport.write(self.message.encode())
print('data sent: {}'.format(self.message))
server_udp[1].tcp_client_connected()
def data_received(self, data):
self.data = format(data.decode())
print('data received: {}'.format(data.decode()))
if self.data == 'Testing':
server_udp[1].send_data_to_udp(self.data)
def send_data_to_tcp(self, data):
self.transport.write(data.encode())
def connection_lost(self, exc):
msg = 'Connection lost with the server...'
info = self.transport.get_extra_info('peername')
server_udp[1].tcp_client_disconnected(msg, info)
#UDP Server
class UdpServer(asyncio.DatagramProtocol):
CLIENT_TCP_TIMEOUT = 5.0
def __init__(self):
self.client_tcp_timeout = None
def connection_made(self, transport):
print('start', transport)
self.transport = transport
def datagram_received(self, data, addr):
self.data = data.strip()
self.data = self.data.decode()
print('Data received:', self.data, addr)
if self.data == 'send to tcp.':
client_tcp[1].send_data_to_tcp(self.data)
def connection_lost(self, exc):
print('stop', exc)
def send_data_to_udp(self, data):
print('Receiving on UDPServer Class: ', (data))
def connect_client_tcp(self):
coro = loop.create_connection(TcpClient, 'localhost', 8000)
#client_tcp = loop.run_until_complete(coro)
client_tcp = asyncio.async(coro)
def tcp_client_disconnected(self, data, info):
print(data)
self.client_tcp_info = info
self.client_tcp_timeout = asyncio.get_event_loop().call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)
def tcp_client_connected(self):
if self.client_tcp_timeout:
self.client_tcp_timeout.cancel()
print('call_later cancel.')
loop = asyncio.get_event_loop()
#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
#server_udp = asyncio.Task(coro)
server_udp = loop.run_until_complete(coro)
#TCP client
coro = loop.create_connection(TcpClient, 'localhost', 8000)
#client_tcp = asyncio.async(coro)
client_tcp = loop.run_until_complete(coro)
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
import asyncio
class EchoServer(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
print('data received: {}'.format(data.decode()))
self.transport.write(data)
# close the socket
#self.transport.close()
#def connection_lost(self):
# print('server closed the connection')
loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, 'localhost', 8000)
server = loop.run_until_complete(coro)
print(server)
print(dir(server))
print(dir(server.sockets))
print('serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
print("exit")
finally:
server.close()
loop.close()
Run Code Online (Sandbox Code Playgroud)
dan*_*ano 13
你真的只需要一些小修正.首先,我写了一个协程来处理连接重试:
@asyncio.coroutine
def do_connect():
global tcp_server # Make sure we use the global tcp_server
while True:
try:
tcp_server = yield from loop.create_connection(TcpClient,
'localhost', 8000)
except OSError:
print("Server not up retrying in 5 seconds...")
yield from asyncio.sleep(5)
else:
break
Run Code Online (Sandbox Code Playgroud)
然后我们用它来启动所有事情:
loop = asyncio.get_event_loop()
#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
server_udp = loop.run_until_complete(coro)
#TCP client
loop.run_until_complete(do_connect())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
接下来的部分是在app.py启动后处理服务器关闭/返回.我们需要修复tcp_client_disconnected并connect_client_tcp妥善处理:
def connect_client_tcp(self):
global client_tcp
task = asyncio.async(do_connect())
def cb(result):
client_tcp = result
task.add_done_callback(cb)
def tcp_client_disconnected(self, data, info):
print(data)
self.client_tcp_info = info
self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)
Run Code Online (Sandbox Code Playgroud)
有趣的是connect_client_tcp.您的原始版本有两个问题:
您client_tcp直接分配给结果asyncio.async(coro),这意味着client_tcp分配给了asyncio.Task.那不是你想要的; 你想client_tcp被分配到完成的结果asyncio.Task.我们实现了通过使用task.add_done_callback分配client_tcp到的结果Task,一旦它完成.
你忘记global client_tcp了方法的顶部.没有它,你只是创建了一个名为的局部变量client_tcp,它在最后被抛弃了connect_client_tcp.
一旦这些问题都是固定的,我能够运行app.py,启动/停止serv.py,每当我想要的,但总能看到来自正确发送的所有邮件ncat到serv.py时所有的三个组件一起运行.
这是完整的app.py,便于复制/粘贴:
import asyncio
#TCP client
class TcpClient(asyncio.Protocol):
message = 'Testing'
def connection_made(self, transport):
self.transport = transport
self.transport.write(self.message.encode())
print('data sent: {}'.format(self.message))
server_udp[1].tcp_client_connected()
def data_received(self, data):
self.data = format(data.decode())
print('data received: {}'.format(data.decode()))
if self.data == 'Testing':
server_udp[1].send_data_to_udp(self.data)
def send_data_to_tcp(self, data):
self.transport.write(data.encode())
def connection_lost(self, exc):
msg = 'Connection lost with the server...'
info = self.transport.get_extra_info('peername')
server_udp[1].tcp_client_disconnected(msg, info)
#UDP Server
class UdpServer(asyncio.DatagramProtocol):
CLIENT_TCP_TIMEOUT = 5.0
def __init__(self):
self.client_tcp_timeout = None
def connection_made(self, transport):
print('start', transport)
self.transport = transport
def datagram_received(self, data, addr):
self.data = data.strip()
self.data = self.data.decode()
print('Data received:', self.data, addr)
if self.data == 'send to tcp.':
client_tcp[1].send_data_to_tcp(self.data)
def connection_lost(self, exc):
print('stop', exc)
def send_data_to_udp(self, data):
print('Receiving on UDPServer Class: ', (data))
def connect_client_tcp(self):
global client_tcp
coro = loop.create_connection(TcpClient, 'localhost', 8000)
task = asyncio.async(do_connect())
def cb(result):
client_tcp = result
task.add_done_callback(cb)
def tcp_client_disconnected(self, data, info):
print(data)
self.client_tcp_info = info
self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)
def tcp_client_connected(self):
if self.client_tcp_timeout:
self.client_tcp_timeout.cancel()
print('call_later cancel.')
@asyncio.coroutine
def do_connect():
global client_tcp
while True:
try:
client_tcp = yield from loop.create_connection(TcpClient, 'localhost', 8000)
except OSError:
print("Server not up retrying in 5 seconds...")
yield from asyncio.sleep(1)
else:
break
loop = asyncio.get_event_loop()
#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
server_udp = loop.run_until_complete(coro)
#TCP client
loop.run_until_complete(do_connect())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)