Python中串口操作的实现示例
作者:道友老李
介绍
基于Python的串口通信功能主要通过使用pyserial
库来实现。pyserial
是一个跨平台的Python库,它提供了对串行端口(如RS232、USB转串口等)进行读写操作的支持。下面将详细介绍如何使用pyserial
来进行串口通信。
安装PySerial
首先需要安装pyserial
库,可以通过pip工具轻松完成:
pip install pyserial
基本概念
波特率(Baud Rate):表示每秒传输的数据位数,是串行通信中的一个重要参数,必须确保发送端和接收端设置相同的波特率。
数据位(Data Bits):通常为7或8位,指每次传输的数据量大小。
停止位(Stop Bits):用于标识一个字节结束的数量,默认情况下为1个停止位。
校验位(Parity Bit):可选参数,用来检测传输错误,有无校验、奇校验和偶校验三种选择。
流控制(Flow Control):硬件流控(RTS/CTS, DTR/DSR)或软件流控(XON/XOFF),用于管理数据流动。
使用PySerial的基本步骤
导入模块
import serial
from serial.tools import list_ports
查找可用的串口
- 可以使用
list_ports.comports()
函数列出系统中所有可用的串口设备。
- 可以使用
打开串口连接
- 使用
serial.Serial()
创建一个串口对象,并指定相应的参数(如端口号、波特率等)。
- 使用
配置串口参数
- 在创建串口对象时可以传递额外的关键字参数来设置波特率、数据位、停止位、校验位等。
读取与写入数据
- 通过调用串口对象的
read()
、write()
方法来进行数据的读取和发送。
- 通过调用串口对象的
关闭串口连接
- 当不再需要使用串口时,应该调用
close()
方法来释放资源。
- 当不再需要使用串口时,应该调用
串行通信服务实现
这段代码展示了如何使用 pyserial
库来管理和操作串行端口,包括列出可用的串行端口、打开和关闭端口、发送和接收数据。以下是详细的功能解析:
环境准备
确保安装了 pyserial
库,以便能够访问和管理串行端口。
pip install pyserial
列出所有串行端口
list_ports
函数用于扫描系统中的所有串行端口,并返回一个包含这些端口名称的列表。它还记录每个端口的描述信息和硬件ID,方便调试或用户选择正确的端口。
def list_ports(): port_names = [] ports = serial.tools.list_ports.comports() for port, desc, hwid in ports: log_message(f"Port: {port}, Description: {desc}, HWID: {hwid}") port_names.append(port) return port_names
串行通信服务类 (CommService)
定义了一个用于串行通信的服务类CommService
,以及一个列出所有可用串行端口的函数list_ports
。它使用了Python的serial
库来处理串行通信,并且引入了一些辅助模块来增强功能
导入模块
深色版本
import serial import binascii import serial.tools.list_ports from utils.const_util import AppConst
serial
:提供了对串行端口(如RS232、USB转串等)的访问。binascii
:包含将二进制数据转换为ASCII字符串的函数。serial.tools.list_ports
:提供工具函数来枚举系统上的串行端口。utils.const_util.AppConst
:自定义模块中定义的应用常量。
列出所有可用串行端口
深色版本
def list_ports(): port_names = [] ports = serial.tools.list_ports.comports() for port, desc, hwid in ports: print(f"Port: {port}, Description: {desc}, HWID: {hwid}") port_names.append(port) return port_names
list_ports
函数:遍历所有可用的串行端口,并打印每个端口的名称、描述和硬件ID。- 返回一个包含所有端口名称的列表,可用于选择具体的通信端口。
串行通信服务类
深色版本
class CommService: def __init__(self, comm_port): self.comm_port = comm_port self.ser_comm = None
- 构造方法
__init__
:初始化时设置串行端口名称,并将实际的串行连接对象初始化为None
。
设置串行端口
def set_comm_port(self, comm_port): self.comm_port = comm_port
set_comm_port
方法:允许动态更改串行端口名称。
打开串行连接
def open_comm(self): try: self.ser_comm = serial.Serial(self.comm_port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False) except: return None return self.ser_comm
open_comm
方法:尝试打开指定的串行端口,并配置波特率、字节大小、校验位、停止位等参数。- 如果打开失败(例如端口不存在或已被占用),捕获异常并返回
None
;否则返回打开的串行连接对象。
关闭串行连接
def close_comm(self): if self.is_opened: self.ser_comm.close() self.ser_comm = None
close_comm
方法:检查串行连接是否打开,如果是,则关闭连接并重置连接对象为None
。
发送数据
def send_comm(self, data): if self.is_opened: try: self.ser_comm.write(bytearray.fromhex(data.replace(' ', ''))) except serial.serialutil.SerialTimeoutException: msg = AppConst.COMM_SEND_OVER return False, msg return True, '' else: msg = AppConst.COMM_SEND_FAILED return False, msg
send_comm
方法:如果串行连接是打开状态,尝试发送十六进制格式的数据。- 使用
bytearray.fromhex()
将十六进制字符串转换为字节数组后发送。 - 捕获可能发生的超时异常,并返回相应的错误信息。
- 成功发送后返回
True
及空消息,发送失败则返回False
及错误消息。
读取数据
def read_comm(self): if self.is_opened: if self.ser_comm.in_waiting > 0: try: data = self.ser_comm.readline() except: return None return binascii.hexlify(data).decode('ascii') return None else: return None
read_comm
方法:如果串行连接是打开状态并且有数据待读取,尝试读取一行数据。- 使用
binascii.hexlify()
将读取到的数据转换为十六进制表示形式,并解码为ASCII字符串。 - 如果发生异常或者没有数据可读,则返回
None
。
检查连接状态
def is_opened(self): return self.ser_comm is not None and self.ser_comm.is_open
is_opened
方法:检查当前是否有有效的串行连接对象,并且该连接是否处于打开状态。
总结
CommService
类封装了与串行设备通信所需的所有基本操作,包括打开/关闭连接、发送/接收数据以及检查连接状态。它还提供了一个静态方法list_ports
来列出系统上所有的串行端口,这在需要用户选择具体端口进行通信时非常有用。此外,通过使用try-except
语句,代码实现了简单的错误处理逻辑,确保即使在出现问题的情况下也能维持程序的基本运行。最后,利用AppConst
中的常量,使得错误消息可以集中管理,便于维护和国际化支持。
完整代码
import serial import binascii import serial.tools.list_ports from utils.const_util import AppConst def list_ports(): port_names = [] ports = serial.tools.list_ports.comports() for port, desc, hwid in ports: print(f"Port: {port}, Description: {desc}, HWID: {hwid}") port_names.append(port) return port_names class CommService: def __init__(self, comm_port): self.comm_port = comm_port self.ser_comm = None def set_comm_port(self, comm_port): self.comm_port = comm_port def open_comm(self): try: self.ser_comm = serial.Serial(self.comm_port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False) except: return None return self.ser_comm def close_comm(self): if self.is_opened: self.ser_comm.close() self.ser_comm = None def send_comm(self, data): if self.is_opened: try: self.ser_comm.write(bytearray.fromhex(data.replace(' ', ''))) except serial.serialutil.SerialTimeoutException: msg = AppConst.COMM_SEND_OVER return False, msg return True, '' else: msg = AppConst.COMM_SEND_FAILED return False, msg def read_comm(self): if self.is_opened: if self.ser_comm.in_waiting > 0: try: data = self.ser_comm.readline() except: return None return binascii.hexlify(data).decode('ascii') return None else: return None def is_opened(self): return self.ser_comm is not None and self.ser_comm.is_open
使用示例
下面是一个简单的使用示例,假设已经有一个实例化好的 CommService
对象 comm_service
:
# 列出所有可用的串行端口 available_ports = list_ports() # 假设选择了第一个可用端口进行通信 if available_ports: comm_service.set_comm_port(available_ports[0]) ser = comm_service.open_comm() if ser: # 发送一些数据到串行端口 success, msg = comm_service.send_comm("55 AA 01 02 03") if success: print("Data sent successfully.") else: print(f"Failed to send data: {msg}") # 尝试读取来自串行端口的数据 received_data = comm_service.read_comm() if received_data: print(f"Received data: {received_data}") # 关闭串行端口 comm_service.close_comm() else: print("No available serial ports found.")
通过这种方式,您可以轻松地与连接到计算机的任何串行设备进行交互,无论是读取传感器数据还是控制外部硬件。
到此这篇关于Python中串口操作的实现示例的文章就介绍到这了,更多相关Python 串口操作内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!