I/O密集型串口应用:从线程,基于队列的设计移植到异步(ala Twisted)

bsi*_*sco 4 python multithreading serial-port twisted

因此,我一直致力于通过串行(RS-232)"Master"与无线设备通信的客户端应用程序.我目前使用线程编写应用程序的核心(下面).我一直注意到#python的共识似乎是使用线程并使用Twisted的异步通信功能.

我无法找到使用twisted进行串口异步I/O通信的任何好例子.但是,我发现Dave Peticolas的''Twisted Introduction'(感谢nosklo)我目前正在使用它,但它使用套接字而不是串行通信(但异步概念肯定是非常好的解释).

我将如何使用线程,队列将此应用程序移植到Twisted?有没有任何优点/缺点(我注意到,有时,如果一个线程挂起它会BSOD系统)?

代码(msg_poller.py)

from livedatafeed import LiveDataFeed
from msg_build import build_message_to_send
from utils import get_item_from_queue
from protocol_wrapper import ProtocolWrapper, ProtocolStatus
from crc16 import *
import time
import Queue
import threading
import serial
import gc

gc.enable()
PROTOCOL_HEADER = '\x01'
PROTOCOL_FOOTER = '\x0D\x0A'
PROTOCOL_DLE = '\x90'

INITIAL_MODBUS = 0xFFFF


class Poller:
    """
    Connects to the serial port and polls nodes for data.
    Reads response from node(s) and loads that data into queue.
    Parses qdata and writes that data to database.
    """

    def __init__(self,
            port,
            baudrate,
            parity,
            rtscts,
            xonxoff,
            echo=False):
        try:
            self.serial = serial.serial_for_url(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
        except AttributeError:
            self.serial = serial.Serial(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
            self.com_data_q = None
        self.com_error_q = None
        self.livefeed = LiveDataFeed()
        self.timer = time.time()
        self.dtr_state = True
        self.rts_state = True
        self.break_state = False

    def start(self):
        self.data_q = Queue.Queue()
        self.error_q = Queue.Queue()
        com_error = get_item_from_queue(self.error_q)
        if com_error is not None:
            print 'Error %s' % (com_error)
        self.timer = time.time()
        self.alive = True

        # start monitor thread
        #
        self.mon_thread = threading.Thread(target=self.reader)
        self.mon_thread.setDaemon(1)
        self.mon_thread.start()

        # start sending thread
        #
        self.trans_thread = threading.Thread(target=self.writer)
        self.trans_thread.setDaemon(1)
        self.trans_thread.start()

    def stop(self):
        try:
            self.alive = False
            self.serial.close()
        except (KeyboardInterrupt, SystemExit):
            self.alive = False

    def reader(self):
        """
        Reads data from the serial port using self.mon_thread.
        Displays that data on the screen.
        """
        from rmsg_format import message_crc, message_format
        while self.alive:
            try:
                while self.serial.inWaiting() != 0:

                # Read node data from the serial port. Data should be 96B.

                    data = self.serial.read(96)
                    data += self.serial.read(self.serial.inWaiting())

                    if len(data) > 0:

                        # Put data in to the data_q object
                        self.data_q.put(data)
                        if len(data) == 96:
                            msg = self.data_q.get()

                            pw = ProtocolWrapper(
                                        header=PROTOCOL_HEADER,
                                        footer=PROTOCOL_FOOTER,
                                        dle=PROTOCOL_DLE)
                            status = map(pw.input, msg)

                            if status[-1] == ProtocolStatus.IN_MSG:
                                # Feed all the bytes of 'msg' sequentially into pw.input

                                # Parse the received CRC into a 16-bit integer
                                rec_crc = message_crc.parse(msg[-4:]).crc

                                # Compute the CRC on the message
                                calc_crc = calcString(msg[:-4], INITIAL_MODBUS)
                                from datetime import datetime
                                ts = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
                                if rec_crc != calc_crc:
                                    print ts
                                    print 'ERROR: CRC Mismatch'
                                    print msg.encode('hex')
                                else:
                                    #msg = message_format.parse(msg[1:])
                                    #print msg.encode('hex') + "\r\n"
                                    msg = message_format.parse(msg[1:])
                                    print msg
                                    #return msg
                                    gc.collect()
                    time.sleep(.2)
            except (KeyboardInterrupt, SystemExit, Exception, TypeError):
                self.alive = False
                self.serial.close()
                raise

    def writer(self):
        """
        Builds the packet to poll each node for data.
        Writes that data to the serial port using self.trans_thread
        """
        import time
        try:
            while self.alive:
                try:
                    dest_module_code = ['DRILLRIG',
                            'POWERPLANT',
                            'GENSET',
                            'MUDPUMP']
                    dest_ser_no = lambda x: x + 1
                    for code in dest_module_code:
                        if code != 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='%s' % (code),
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                        elif code == 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(1),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                except (KeyboardInterrupt, SystemExit):
                    self.alive = False
                    self.serial.close()
                    raise
        except (KeyboardInterrupt, SystemExit):
            self.alive = False
            self.serial.close()
            raise


def main():
    poller = Poller(
            port='COM4',
            baudrate=115200,
            parity=serial.PARITY_NONE,
            rtscts=0,
            xonxoff=0,
            )
    poller.start()
    poller.reader()
    poller.writer()
    poller.stop()
if __name__ == '__main__':
    main()                                                                      
Run Code Online (Sandbox Code Playgroud)

Sen*_*ran 7

在线程/队列方法和使用扭曲的方法之间编写直接的一对一映射程序是非常困难的(如果不是不可能的话).

我建议,通过使用Protocol和协议特定的方法来获取扭曲和反应器的方式.考虑一下,当您使用延迟使用twisted时,您使用线程和队列显式编码的所有异步事件都是免费提供给您的.

twisted似乎支持SerialPort在它的反应堆上使用SerialPort传输类,基本结构似乎有点像这样.

from twisted.internet import reactor
from twisted.internet.serialport import SerialPort

SerialPort(YourProtocolClass(), Port, reactor, baudrate=baudrate))
reactor.run() 
Run Code Online (Sandbox Code Playgroud)

在YourProtocolClass()中,您将处理特定于串行端口通信要求的各种事件.doc/core/examples目录包含gpsfix.py和mouse.py等示例.