如何在树莓派上实现Modbus?

Ove*_*mon 3 python modbus rs485 raspberry-pi raspberry-pi4

我目前正在进行一个项目,尝试以 Raspberry Pi 4 作为主站来实现 Modbus,并控制多个执行器作为从站。为此,我为我的 Pi 购买了一个特殊的防护罩。我运行了一个演示测试程序,确认 Pi 可以与其新的扩展板配合使用,但后来遇到了麻烦。

Shield 用户手册- 在用户手册文件夹内。

掌握:

## To install dependencies:
## sudo pip3 install modbus-tk
##################################################################################################
import serial
import fcntl
import os
import struct
import termios
import array
#import modbus lib
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
#import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk import modbus_rtu

# RS485 ioctls define
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",9600)    
ser2 = serial.Serial("/dev/ttySC1",9600)

def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

    #enable 485 chanel 2
    fcntl.ioctl(ser2, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser2, TIOCSRS485, buf)
#end of rs485_enable():


if __name__ == '__main__':

    logger = modbus_tk.utils.create_logger("console")

    rs485_enable()

    #set modbus master
    master = modbus_rtu.RtuMaster(
           serial.Serial(port= '/dev/ttySC0',
           baudrate=9600,
           bytesize=8,
           parity='N',
           stopbits=1,
           xonxoff=0)
       )

    master.set_timeout(5.0)
    master.set_verbose(True)
    logger.info("connected")

    logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 4))

    #send some queries
    #logger.info(master.execute(1, cst.READ_COILS, 0, 10))
    #logger.info(master.execute(1, cst.READ_DISCRETE_INPUTS, 0, 8))
    #logger.info(master.execute(1, cst.READ_INPUT_REGISTERS, 100, 3))
    #logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
    #logger.info(master.execute(1, cst.WRITE_SINGLE_COIL, 7, output_value=1))
    #logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 100, output_value=54))
    #logger.info(master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1]))
    #logger.info(master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 100, output_value=xrange(12)))

#end of if __name__ == '__main__': 
Run Code Online (Sandbox Code Playgroud)

奴隶:

import sys

import serial
import fcntl
import os
import struct
import termios
import array
import time

import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
#import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk import modbus_rtu
# RS485 ioctls
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",9600)    
ser2 = serial.Serial("/dev/ttySC1",9600)

def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

    #enable 485 chanel 2
    fcntl.ioctl(ser2, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser2, TIOCSRS485, buf)
#end of def rs485_enable():


if __name__ == '__main__':

    logger = modbus_tk.utils.create_logger("console")

    rs485_enable()

    logger = modbus_tk.utils.create_logger(name="console", record_format="%(message)s")

    #Create the server
    server = modbus_rtu.RtuServer(serial.Serial('/dev/ttySC1'))

    try:
        logger.info("running...")
        logger.info("enter 'quit' for closing the server")

        server.start()

        slave_1 = server.add_slave(1)
        slave_1.add_block('0', cst.HOLDING_REGISTERS, 0, 100)
        while True:
            cmd = sys.stdin.readline()
            args = cmd.split(' ')

            if cmd.find('quit') == 0:
                sys.stdout.write('bye-bye\r\n')
                break

            elif args[0] == 'add_slave':
                slave_id = int(args[1])
                server.add_slave(slave_id)
                sys.stdout.write('done: slave %d added\r\n' % (slave_id))

            elif args[0] == 'add_block':
                slave_id = int(args[1])
                name = args[2]
                block_type = int(args[3])
                starting_address = int(args[4])
                length = int(args[5])
                slave = server.get_slave(slave_id)
                slave.add_block(name, block_type, starting_address, length)
                sys.stdout.write('done: block %s added\r\n' % (name))

            elif args[0] == 'set_values':
                slave_id = int(args[1])
                name = args[2]
                address = int(args[3])
                values = []
                for val in args[4:]:
                    values.append(int(val))
                slave = server.get_slave(slave_id)
                slave.set_values(name, address, values)
                values = slave.get_values(name, address, len(values))
                sys.stdout.write('done: values written: %s\r\n' % (str(values)))

            elif args[0] == 'get_values':
                slave_id = int(args[1])
                name = args[2]
                address = int(args[3])
                length = int(args[4])
                slave = server.get_slave(slave_id)
                values = slave.get_values(name, address, length)
                sys.stdout.write('done: values read: %s\r\n' % (str(values)))

            else:
                sys.stdout.write("unknown command %s\r\n" % (args[0]))
    finally:
        server.stop()
Run Code Online (Sandbox Code Playgroud)

我计划使用的执行器是Linak LA36。我相信这些是我将使用的功能:

在此输入图像描述

文档第 21-22 页。

墙只是简单地开始使用 Modbus。我研究了执行器的技术文档以确定要发送的内容,但我在编写程序时迷失了方向。我原本希望能够修改演示程序以满足我的需要,但无法理解其中的代码用途。

在互联网上搜索时,我试图找到有关不同变量和函数的作用的教程或描述,以更好地理解,但找不到类似的内容。我确实找到了演示代码的起源但无法找到/理解任何可以帮助我的东西。

我已经看到有些程序应该通过 Raspberry Pi 启用 Modbus(例如PyModbus),但我不确定我的情况是否有所不同,有一个特殊的屏蔽,以及这些程序是否适用于我的设置?

所以,最后,我在这里希望得到一些帮助。建议、说明、示例,在这一点上,任何能让我走得更远的东西都是受欢迎的。也可能使用演示代码作为基础是一个错误,有人可以给我指出不同的方向吗?

我非常愿意尝试不同的事情,并且感谢任何帮助。

先感谢您。

更新:

从那时起,我一直在寻找其他选项,并偶然发现了我正在尝试使用的minimalmodbus 。RS485 扩展板仍处于演示配置......

RS485屏蔽

...我一直在尝试在Python解释器中执行从minimalmodbus找到的一些代码:

>>> import minimalmodbus
>>> instr = minimalmodbus.Instrument('/dev/ttySC0', 1)
>>> instr
minimalmodbus.Instrument<id=0xb7437b2c, address=1, close_port_after_each_call=False, debug=False, serial=Serial<id=0xb7437b6c, open=True>(port='/dev/ttySC0', baudrate=19200, bytesize=8, parity='N', stopbits=1, timeout=0.05, xonxoff=False, rtscts=False, dsrdtr=False)>
>>> instr.read_register(24, 1)
5.0
>>> instr.write_register(24, 450, 1)
>>> instr.read_register(24, 1)
Run Code Online (Sandbox Code Playgroud)

我将'/dev/ttyUSB0'(在原始代码中)更改为'/dev/ttySC0'。现在我被困在:

>>> instr
    minimalmodbus.Instrument<id=0xb7437b2c, address=1, close_port_after_each_call=False, debug=False, serial=Serial<id=0xb7437b6c, open=True>(port='/dev/ttySC0', baudrate=19200, bytesize=8, parity='N', stopbits=1, timeout=0.05, xonxoff=False, rtscts=False, dsrdtr=False)>
Run Code Online (Sandbox Code Playgroud)

这给出了SyntaxError: 无效的语法突出显示minimalmodbus

Mar*_* G. 6

我认为你需要从简单的事情开始,并在此基础上进行构建。在您的评论中,您声明您想使用minimalmodbus,这很好,但让我们从演示代码开始,并尝试首先使用您的执行器。稍后您可以返回其他库,例如minimalmodbuspymodbus

在我们进入代码之前,我认为您应该了解 Modbus 是什么。本质上,Modbus 使用串行端口(它可以通过 RS485 或更传统的 RS232 或通过 TTL 电平,但这只是物理层,用于传递信息的电气电平;您已经在帽子上安装了 RS485 端口,并且您的设备上已安装了 RS485 端口。执行器也通过 RS485 工作,因此只要正确连接总线(A 到 A 和 B 到 B),就无需担心这方面的问题。

那么,除了串口之外,还需要Modbus什么呢?Modbus 以主从配置工作。这意味着只有一台主设备(您的 Raspberry Pi 计算机)和一个或多个从设备(您的执行器)。根据我们在上一段中所说的,您的 Modbus 在两线(具有 RS485 电平)总线上运行。在此配置中,与更通用的 RS232 标准相反,其中具有三根线:RX、TX 和 GND,您无法进行全双工通信(只有主站或所有从站之一可以与总线通信,而所有从站都可以与总线通信)。其他设备进行监听,类似于对讲机无线电链路)。为了进一步扩展这个类比,就像您在 WT 无线电上需要一个 PTT(即按即说)按钮一样,您需要为您的 Modbus 至PTT上的主站或任何从站提供一个信号,当他们想要讲话。部分RS485收发器具有该功能,由硬件实现;在你的帽子上,没有详细检查电路并查看演示代码,总线上的方向控制似乎是在具有该rs485_enable()功能的软件中实现的。编辑:详细查看硬件,我必须纠正自己:您的 RS485 帽子实际上是通过硬件及其 SPI 到双 UART SC16IS752进行方向控制。该芯片设计为向后兼容使用流量控制 RTS 信号作为方向控制(我们之前提到的 PTT 功能)的旧 UARTS。这就是为什么你需要rs485_enable().

理论已经足够了,现在是实践部分。您似乎错过了一件重要的事情。在您链接的手册第 21 页上,您有以下段落:

在集成到 MODBUS 系统之前,必须检查并最终更改执行器的一些参数。该准备工作是通过使用BusLink PC 工具(该工具将在后面详细描述)完成的,并保证执行器能够执行基本功能。可能需要进一步微调以满足系统或应用程序的要求。

然后,如果您返回第 12 页的表格,您将看到他们所谓的寻址(通常称为从机 ID)默认为 24​​7(未分配)。因此,您需要做的第一件事是使用 Buslink PC 工具将执行器上的地址设置为 1 到 246 之间的任意数字(如果您计划在总线上连接多个执行器,则必须设置不同的数字对于每个执行器)。详细信息请参见第 28 页。

成功完成该配置后,您应该能够运行演示主代码。例如,如果您想将执行器移动 10 毫米,您可以尝试:

## To install dependencies:
## sudo pip3 install modbus-tk
##################################################################################################
import serial
import fcntl
import os
import struct
import termios
import array
#import modbus lib
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
#import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk import modbus_rtu

# RS485 ioctls define
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",19200)    
ser2 = serial.Serial("/dev/ttySC1",9600)

def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

    #enable 485 chanel 2
    fcntl.ioctl(ser2, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser2, TIOCSRS485, buf)
#end of rs485_enable():


if __name__ == '__main__':

    logger = modbus_tk.utils.create_logger("console")

    rs485_enable()

    #set modbus master
    master = modbus_rtu.RtuMaster(
           serial.Serial(port= '/dev/ttySC0',
           baudrate=9600,
           bytesize=8,
           parity='N',
           stopbits=1,
           xonxoff=0)
       )

    master.set_timeout(5.0)
    master.set_verbose(True)
    logger.info("connected")

    logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 1, output_value=100))  #Write target position 10mm (1/10mm*100)
    logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 2, output_value=1))  #Move actuator
Run Code Online (Sandbox Code Playgroud)

请注意,我只更改了演示代码的最后两行。前面的第一个 1cst.WRITE_SINGLE_REGISTER必须与您使用 BusLink PC 工具设置的从站地址相同。后面的数字(第一行1,第二行2)就是你根据手册第22页需要写入的寄存器号。最后,output_value是您需要在每个寄存器上写入的值。在寄存器 1 上,您需要写入要将执行器从其参考位置(以 0.1 毫米的倍数测量)移动的目标位置,并在第二个寄存器上写入 1(再次参见手册第 22 页上的表格,步骤 2 和3)。

您可以通过读取输入寄存器 3 和 5 来完成步骤 4 的序列。请注意,要读取输入寄存器,功能代码为cst.READ_INPUT_REGISTERS

尝试一下,看看是否能成功。完成后我们可以看一下minimalmodbus

编辑:更好地了解硬件的工作原理(请参阅上面的编辑),现在很清楚您可以使用您喜欢的任何 Modbus 库,您只需保留上面的演示代码并在开始发送数据之前#end of rs485_enable():调用某处即可。rs485_enable()

对于minimalmodbus,你可以尝试这样的事情:

import serial
import fcntl
import os
import struct
import termios
import array
#Remove modbus-tk imports and add minimalmodbus
import minimalmodbus


# only /dev/ttySC0 will be used
# RS485 ioctls define
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",19200)    


def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

#end of rs485_enable():


if __name__ == '__main__':

    actuator = minimalmodbus.Instrument('/dev/ttySC0', 1) # port name, slave address (in decimal), change according to actuator address

    rs485_enable()   #you need to keep this for your hat to work

    #minimalmodbus setup
    actuator.serial.port               # this is the serial port name
    actuator.serial.baudrate = 19200   # Baud rate
    actuator.serial.bytesize = 8
    actuator.serial.parity   = serial.PARITY_NONE
    actuator.serial.stopbits = 1
    actuator.serial.timeout  = 0.05   # seconds

    actuator.address     # this is the slave (actuator) address number
    actuator.mode = minimalmodbus.MODE_RTU   # rtu mode

    #write registers
    actuator.write_register(1, 100)  #write target distance to move
    actuator.write_register(2, 1)    #Move!
Run Code Online (Sandbox Code Playgroud)

  • 好吧,它开始看起来像有用的东西了。我已经使用 Arduino 和 USB 转 RS485 适配器进行了一些测试,以便我可以在计算机上使用从属模拟器进行测试。我已成功发送带有“cst.WRITE_SINGLE_REGISTER”的命令,并且它还接受“cst.READ_INPUT_REGISTERS”。然而,我在如何读取这些值方面遇到了一些障碍。看来我不能简单地说“read = ...cst.READ_INPUT_REGISTERS...”然后打印“read”。我可以寻求帮助如何将读取的值输入到程序中吗? (2认同)