# -*- coding: UTF-8 -*- import serial import time import binascii ''' 描述:串口同步基类 ''' class BaseSerial(): # 构造函数 def __init__(self): self.exception = None self.ser = serial.Serial() ''' 打开串口 ''' def open(self, port, baudrate=115200, bytesize=8, parity=serial.PARITY_NONE, stopbits=1, timeout=3, writeTimeout=0.5): if type(port) == type(0): self.ser.port = "COM" + str(port) else: self.ser.port = port # 波特率; self.ser.baudrate = baudrate # 数据位; self.ser.bytesize = bytesize # 校验码; self.ser.parity = parity # PARITY_NONE=0 # 停止位; self.ser.stopbits = stopbits # 串口读超时值 self.ser.timeout = timeout # 串口写超时值 self.ser.write_timeout = writeTimeout try: self.ser.open() # 返回结果; return self.ser.is_open except Exception as e: self.exception = e print('打开串口异常', str(e)) return False ''' 重新打开串口 ''' def reOpen(self): self.exception = None self.ser.close() try: self.ser.open() # 返回结果; return self.ser.is_open except Exception as e: self.exception = e return False '''关闭串口''' def close(self): self.ser.close() ''' 描述:写串口 ''' def write(self, cmd: bytearray): try: self.exception = None writelen = self.ser.write(cmd) # flush等待写入完成; self.ser.flush() print("writehex:" + str(binascii.b2a_hex(cmd)) + " writelen=" + str(writelen)) return True if writelen == len(cmd) else False except Exception as e: self.exception = e print('写串口异常', str(e)) return False ''' 描述:读取串口数据; 返回:返回16进制字符串; 补充:ser.in_waiting 实际是缓冲区里的数据长度,也就是要读的长度; ''' def read(self, size=0): try: self.exception = None # 先读取1个,如果超时表示通讯失败; bys = self.ser.read(1) for i in range(0, 3): while self.ser.in_waiting > 0: d = self.ser.read(1) bys = bys + d # 每当缓冲区为0,等10ms,循环3次确保机器响应完全; time.sleep(0.01) # 再判断是否读取到指定大小(size!=0); if bys.__len__() < size: time.sleep(0.1) # 等待0.1秒让机器响应; if self.ser.in_waiting > 0: d = self.ser.read(self.ser.in_waiting) bys = bys + d print("readhex:" + str(binascii.hexlify(bys)) + " readlen=" + str(bys.__len__())) return bys except Exception as e: self.exception = e print('读串口异常', str(e)) return None def read2(self, size=0): try: self.exception = None time.sleep(0.01) # 先读取1个,如果超时表示通讯失败; bys = self.ser.read(1) while self.ser.in_waiting > 0: d = self.ser.read(1) bys = bys + d # 再sleep一会,机器响应可能过慢,导致有字节遗漏; time.sleep(0.015) while self.ser.in_waiting > 0: d = self.ser.read(1) bys = bys + d # 再判断是否读取到指定大小(size!=0); if bys.__len__() < size: time.sleep(0.1) # 等待0.1秒让机器响应; if self.ser.in_waiting > 0: d = self.ser.read(self.ser.in_waiting) bys = bys + d print("readhex:" + str(binascii.hexlify(bys)) + " readlen=" + str(bys.__len__())) return bys except Exception as e: self.exception = e print('读串口异常', str(e)) return None if __name__ == "__main__": pass