python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python 串口操作

Python中串口操作的实现示例

作者:道友老李

本文主要介绍了使用Python的pyserial库进行串口通信,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

介绍

基于Python的串口通信功能主要通过使用pyserial库来实现。pyserial是一个跨平台的Python库,它提供了对串行端口(如RS232、USB转串口等)进行读写操作的支持。下面将详细介绍如何使用pyserial来进行串口通信。

安装PySerial

首先需要安装pyserial库,可以通过pip工具轻松完成:

pip install pyserial

基本概念

使用PySerial的基本步骤

串行通信服务实现

这段代码展示了如何使用 pyserial 库来管理和操作串行端口,包括列出可用的串行端口、打开和关闭端口、发送和接收数据。以下是详细的功能解析:

环境准备

确保安装了 pyserial 库,以便能够访问和管理串行端口。

pip install pyserial

列出所有串行端口

list_ports 函数用于扫描系统中的所有串行端口,并返回一个包含这些端口名称的列表。它还记录每个端口的描述信息和硬件ID,方便调试或用户选择正确的端口。

def list_ports():
    port_names = []
    ports = serial.tools.list_ports.comports()
    for port, desc, hwid in ports:
        log_message(f"Port: {port}, Description: {desc}, HWID: {hwid}")
        port_names.append(port)
    return port_names

串行通信服务类 (CommService)

定义了一个用于串行通信的服务类CommService,以及一个列出所有可用串行端口的函数list_ports。它使用了Python的serial库来处理串行通信,并且引入了一些辅助模块来增强功能

导入模块

深色版本

import serial
import binascii
import serial.tools.list_ports
from utils.const_util import AppConst

列出所有可用串行端口

深色版本

def list_ports():
    port_names = []
    ports = serial.tools.list_ports.comports()
    for port, desc, hwid in ports:
        print(f"Port: {port}, Description: {desc}, HWID: {hwid}")
        port_names.append(port)

    return port_names

串行通信服务类

深色版本

class CommService:
    def __init__(self, comm_port):
        self.comm_port = comm_port
        self.ser_comm = None

设置串行端口

    def set_comm_port(self, comm_port):
        self.comm_port = comm_port

打开串行连接

    def open_comm(self):
        try:
            self.ser_comm = serial.Serial(self.comm_port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
                                          stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False,
                                          dsrdtr=False)
        except:
            return None
        return self.ser_comm

关闭串行连接

    def close_comm(self):
        if self.is_opened:
            self.ser_comm.close()
            self.ser_comm = None

发送数据

    def send_comm(self, data):
        if self.is_opened:
            try:
                self.ser_comm.write(bytearray.fromhex(data.replace(' ', '')))
            except serial.serialutil.SerialTimeoutException:
                msg = AppConst.COMM_SEND_OVER
                return False, msg
            return True, ''
        else:
            msg = AppConst.COMM_SEND_FAILED
            return False, msg

读取数据

    def read_comm(self):
        if self.is_opened:
            if self.ser_comm.in_waiting > 0:
                try:
                    data = self.ser_comm.readline()
                except:
                    return None
                return binascii.hexlify(data).decode('ascii')
            return None
        else:
            return None

检查连接状态

    def is_opened(self):
        return self.ser_comm is not None and self.ser_comm.is_open

总结

CommService类封装了与串行设备通信所需的所有基本操作,包括打开/关闭连接、发送/接收数据以及检查连接状态。它还提供了一个静态方法list_ports来列出系统上所有的串行端口,这在需要用户选择具体端口进行通信时非常有用。此外,通过使用try-except语句,代码实现了简单的错误处理逻辑,确保即使在出现问题的情况下也能维持程序的基本运行。最后,利用AppConst中的常量,使得错误消息可以集中管理,便于维护和国际化支持。

完整代码

import serial
import binascii
import serial.tools.list_ports
from utils.const_util import AppConst


def list_ports():
    port_names = []
    ports = serial.tools.list_ports.comports()
    for port, desc, hwid in ports:
        print(f"Port: {port}, Description: {desc}, HWID: {hwid}")
        port_names.append(port)

    return port_names


class CommService:
    def __init__(self, comm_port):
        self.comm_port = comm_port
        self.ser_comm = None

    def set_comm_port(self, comm_port):
        self.comm_port = comm_port

    def open_comm(self):
        try:
            self.ser_comm = serial.Serial(self.comm_port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
                                          stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False,
                                          dsrdtr=False)
        except:
            return None
        return self.ser_comm

    def close_comm(self):
        if self.is_opened:
            self.ser_comm.close()
            self.ser_comm = None

    def send_comm(self, data):
        if self.is_opened:
            try:
                self.ser_comm.write(bytearray.fromhex(data.replace(' ', '')))
            except serial.serialutil.SerialTimeoutException:
                msg = AppConst.COMM_SEND_OVER
                return False, msg
            return True, ''
        else:
            msg = AppConst.COMM_SEND_FAILED
            return False, msg

    def read_comm(self):
        if self.is_opened:
            if self.ser_comm.in_waiting > 0:
                try:
                    data = self.ser_comm.readline()
                except:
                    return None
                return binascii.hexlify(data).decode('ascii')

            return None
        else:
            return None

    def is_opened(self):
        return self.ser_comm is not None and self.ser_comm.is_open

使用示例

下面是一个简单的使用示例,假设已经有一个实例化好的 CommService 对象 comm_service

# 列出所有可用的串行端口
available_ports = list_ports()

# 假设选择了第一个可用端口进行通信
if available_ports:
    comm_service.set_comm_port(available_ports[0])
    ser = comm_service.open_comm()
    if ser:
        # 发送一些数据到串行端口
        success, msg = comm_service.send_comm("55 AA 01 02 03")
        if success:
            print("Data sent successfully.")
        else:
            print(f"Failed to send data: {msg}")

        # 尝试读取来自串行端口的数据
        received_data = comm_service.read_comm()
        if received_data:
            print(f"Received data: {received_data}")

        # 关闭串行端口
        comm_service.close_comm()
else:
    print("No available serial ports found.")

通过这种方式,您可以轻松地与连接到计算机的任何串行设备进行交互,无论是读取传感器数据还是控制外部硬件。

到此这篇关于Python中串口操作的实现示例的文章就介绍到这了,更多相关Python 串口操作内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文