在python3 asyncio中使用串口

tim*_*tea 10 python serial-port pyserial python-3.4 python-asyncio

我正在尝试,到目前为止,未能使用python asyncio访问串行端口.

我非常感谢在简单的fd上使用新的python异步框架的任何提示.

干杯!

詹姆士

pde*_*nes 8

这是使用pyserial-asyncio的工作示例:

from asyncio import get_event_loop
from serial_asyncio import open_serial_connection

async def run():
    reader, writer = await open_serial_connection(url='/dev/ttyS0', baudrate=115200)
    while True:
        line = await reader.readline()
        print(str(line, 'utf-8'))

loop = get_event_loop()
loop.run_until_complete(run())
Run Code Online (Sandbox Code Playgroud)


Rod*_*xPy 5

这是使用 FD 的另一种方式

import asyncio
import serial

s = serial.Serial('/dev/pts/13', 9600)


def test_serial():
    '''
    read a line and print.
    '''
    text = ""
    msg = s.read().decode()
    while (msg != '\n'):
        text += msg
        msg = s.read().decode()
    print(text)
    loop.call_soon(s.write, "ok\n".encode())

loop = asyncio.get_event_loop()
loop.add_reader(s, test_serial)
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.close()
Run Code Online (Sandbox Code Playgroud)


Gün*_*ena 5

pySerial 正在获得直接asyncio支持。它现在处于实验状态,但对我来说按预期工作。

从文档中获取的示例:

class Output(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        print('port opened', transport)
        transport.serial.rts = False
        transport.write(b'hello world\n')

    def data_received(self, data):
        print('data received', repr(data))
        self.transport.close()

    def connection_lost(self, exc):
        print('port closed')
        asyncio.get_event_loop().stop()

loop = asyncio.get_event_loop()
coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
Run Code Online (Sandbox Code Playgroud)

  • 显然,这种支持已从`pyserial` 中删除,而是由单独的包`pyserial-asyncio` 支持。请参阅 [pySerial API — pySerial 3.3 文档](http://pyserial.readthedocs.io/en/latest/pyserial_api.html#asyncio) 和 [简短介绍 — pySerial-asyncio 0.4 文档](http://pyserial-asyncio. readthedocs.io/en/latest/shortintro.html)。 (7认同)

tim*_*tea 0

感谢大家的建议,最后,我以稍微不同的方式解决了问题,并使用了asyncio中支持良好的套接字连接,但随后使用了ser2net(http://sourceforge.net/projects/ser2net/)来访问串行端口。

配置大约需要 10 秒,这意味着 python 代码现在也可以处理远程串行端口的访问。