123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- # -*- coding: UTF-8 -*-
- import serial
- import time
- import binascii
- '''
- 描述:串口同步基类
- '''
- class BaseSerial():
- # 构造函数
- def __init__(self):
- self.exception = None
- self.ser = serial.Serial()
- '''
- 打开串口
- '''
- def open(self, port, baudrate=115200, bytesize=8, parity=serial.PARITY_NONE, stopbits=1, timeout=3, writeTimeout=0.5):
- if type(port) == type(0):
- self.ser.port = "COM" + str(port)
- else:
- self.ser.port = port
- # 波特率;
- self.ser.baudrate = baudrate
- # 数据位;
- self.ser.bytesize = bytesize
- # 校验码;
- self.ser.parity = parity # PARITY_NONE=0
- # 停止位;
- self.ser.stopbits = stopbits
- # 串口读超时值
- self.ser.timeout = timeout
- # 串口写超时值
- self.ser.write_timeout = writeTimeout
- try:
- self.ser.open()
- # 返回结果;
- return self.ser.is_open
- except Exception as e:
- self.exception = e
- print('打开串口异常', str(e))
- return False
- '''
- 重新打开串口
- '''
- def reOpen(self):
- self.exception = None
- self.ser.close()
- try:
- self.ser.open()
- # 返回结果;
- return self.ser.is_open
- except Exception as e:
- self.exception = e
- return False
- '''关闭串口'''
- def close(self):
- self.ser.close()
- '''
- 描述:写串口
- '''
- def write(self, cmd: bytearray):
- try:
- self.exception = None
- writelen = self.ser.write(cmd)
- # flush等待写入完成;
- self.ser.flush()
- print("writehex:" + str(binascii.b2a_hex(cmd)) + " writelen=" + str(writelen))
- return True if writelen == len(cmd) else False
- except Exception as e:
- self.exception = e
- print('写串口异常', str(e))
- return False
- '''
- 描述:读取串口数据;
- 返回:返回16进制字符串;
- 补充:ser.in_waiting 实际是缓冲区里的数据长度,也就是要读的长度;
- '''
- def read(self, size=0):
- try:
- time.sleep(0.1) # 必备的等待-机器响应;
- self.exception = None
- # 先读取1个,如果超时表示通讯失败;
- bys = self.ser.read(1)
- for i in range(0, 3):
- while self.ser.in_waiting > 0:
- d = self.ser.read(1)
- bys = bys + d
- # 每当缓冲区为0,等10ms,循环3次确保机器响应完全;
- time.sleep(0.01)
-
- # 再判断是否读取到指定大小(size!=0);
- if bys.__len__() < size:
- time.sleep(0.1) # 等待0.1秒让机器响应;
- if self.ser.in_waiting > 0:
- d = self.ser.read(self.ser.in_waiting)
- bys = bys + d
- print("readhex:" + str(binascii.hexlify(bys)) + " readlen=" + str(bys.__len__()))
- return bys
- except Exception as e:
- self.exception = e
- print('读串口异常', str(e))
- return None
-
- def read2(self, size=0):
- try:
- self.exception = None
- time.sleep(0.01)
- # 先读取1个,如果超时表示通讯失败;
- bys = self.ser.read(1)
- while self.ser.in_waiting > 0:
- d = self.ser.read(1)
- bys = bys + d
- # 再sleep一会,机器响应可能过慢,导致有字节遗漏;
- time.sleep(0.015)
- while self.ser.in_waiting > 0:
- d = self.ser.read(1)
- bys = bys + d
-
- # 再判断是否读取到指定大小(size!=0);
- if bys.__len__() < size:
- time.sleep(0.1) # 等待0.1秒让机器响应;
- if self.ser.in_waiting > 0:
- d = self.ser.read(self.ser.in_waiting)
- bys = bys + d
- print("readhex:" + str(binascii.hexlify(bys)) + " readlen=" + str(bys.__len__()))
- return bys
- except Exception as e:
- self.exception = e
- print('读串口异常', str(e))
- return None
- if __name__ == "__main__":
- pass
|