tim*_*tea 10 python serial-port pyserial python-3.4 python-asyncio
我正在尝试,到目前为止,未能使用python asyncio访问串行端口.
我非常感谢在简单的fd上使用新的python异步框架的任何提示.
干杯!
詹姆士
这是使用pyserial-asyncio的工作示例:
from asyncio import get_event_loop
from serial_asyncio import open_serial_connection
async def run():
reader, writer = await open_serial_connection(url='/dev/ttyS0', baudrate=115200)
while True:
line = await reader.readline()
print(str(line, 'utf-8'))
loop = get_event_loop()
loop.run_until_complete(run())
Run Code Online (Sandbox Code Playgroud)
这是使用 FD 的另一种方式
import asyncio
import serial
s = serial.Serial('/dev/pts/13', 9600)
def test_serial():
'''
read a line and print.
'''
text = ""
msg = s.read().decode()
while (msg != '\n'):
text += msg
msg = s.read().decode()
print(text)
loop.call_soon(s.write, "ok\n".encode())
loop = asyncio.get_event_loop()
loop.add_reader(s, test_serial)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
Run Code Online (Sandbox Code Playgroud)
pySerial 正在获得直接asyncio
支持。它现在处于实验状态,但对我来说按预期工作。
从文档中获取的示例:
class Output(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.rts = False
transport.write(b'hello world\n')
def data_received(self, data):
print('data received', repr(data))
self.transport.close()
def connection_lost(self, exc):
print('port closed')
asyncio.get_event_loop().stop()
loop = asyncio.get_event_loop()
coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
Run Code Online (Sandbox Code Playgroud)
感谢大家的建议,最后,我以稍微不同的方式解决了问题,并使用了asyncio中支持良好的套接字连接,但随后使用了ser2net(http://sourceforge.net/projects/ser2net/)来访问串行端口。
配置大约需要 10 秒,这意味着 python 代码现在也可以处理远程串行端口的访问。
归档时间: |
|
查看次数: |
9421 次 |
最近记录: |