# -*- coding: UTF-8 -*- import serial import time from ssat_sdk.sat_environment import * from ssat_sdk.utils import LoggingUtil import inspect from ssat_sdk.utils.AbnormalClient import abnormal_client pyFileName = os.path.split(__file__)[-1] def get_current_function_name(): return inspect.stack()[1][3] class Chroma22293(): # 构造函数 def __init__(self, port): self.loop = 0 self.timeout = 0.2 self.ser = serial.Serial() self.ser.port = port self.ser.baudrate = 115200 self.ser.bytesize = 8 self.ser.parity = serial.PARITY_NONE self.ser.stopbits = 1 self.ser.timeout = 1 # 读超时; self.ser.writeTimeout = 0.5 # 写超时; self.className = self.__class__.__name__ # 析构函数; def __del__(self): self.close() def sendAbnormal(self, msg): curport = 'COM'+str(self.ser.port) dic = getChroma22293() if dic['devices'][0]['port'] == curport: abnormal_client.sendAbnormal('signalcard', msg) else: abnormal_client.sendAbnormal('signalcard_2', msg) def open(self): try: self.ser.open() if self.ser.is_open == True: # print "%s : Open Success" % self.ser.port LoggingUtil.getDebugLogger().info( pyFileName, self.className, get_current_function_name(), str(self.ser.port)+'Open Success') print self.ser.get_settings() print "rts=" + str(self.ser.rts) print "cts=" + str(self.ser.cts) else: # print "%s : Open Fail" % self.ser.port LoggingUtil.getDebugLogger().info( pyFileName, self.className, get_current_function_name(), str(self.ser.port)+'Open Fail') # 返回结果; return self.ser.is_open except Exception, e: # print u'打开串口失败,描述:', e LoggingUtil.getDebugLogger().info( pyFileName, self.className, get_current_function_name(), '打开串口失败,描述:'+str(e)) self.sendAbnormal(str(e)) return False def writeCmd(self, cmdstr): try: len = self.ser.write(cmdstr + b"\r") # 注意后面要回车键(\r),命令才会发送出去执行 print "Sending cmd:" + cmdstr + " len=" + str(len) return True except Exception, e: self.sendAbnormal(str(e)) # print u'写串口失败,描述:', e LoggingUtil.getDebugLogger().info( pyFileName, self.className, get_current_function_name(), '写串口失败,描述:'+str(e)) #重新打开串口; self.close() self.open() return False def readResult(self): """ If the result is normal, "OK;.." will be output. OK;[CR][LF] 正常 If the result is an error, the following message will be output. NG: SYNTAX ERRO;[CR][LF] 指令语法错误 NG: BOUNDARY ERROR;[CR][LF] 输入参数超出范围 NG: EXECUTE ERROR;[CR][LF] 系统执行指令时发生错误 """ try: self.loop = 0 time.sleep(self.timeout) while self.ser.in_waiting == 0: time.sleep(self.timeout) self.loop += 1 if self.loop == 10: break result = self.ser.readlines() print "cmd result:" + str(result) if len(result) == 0: result = ['ng1: read none', 'ng2: read none'] return result except Exception, e: self.sendAbnormal(str(e)) # print u'读串口失败,错误描述:', e LoggingUtil.getDebugLogger().info( pyFileName, self.className, get_current_function_name(), '读串口失败,错误描述:'+str(e)) #重新打开串口; self.close() self.open() return ['ng1: SerialException', 'ng2: SerialException'] def execmd(self, cmdstr): if self.writeCmd(cmdstr): return self.readResult() else: return ['ng1: SerialException', 'ng2: SerialException'] def close(self): self.ser.close() if self.ser.is_open == False: # print "%s :Close Success" % self.ser.port LoggingUtil.getDebugLogger().info( pyFileName, self.className, get_current_function_name(), str(self.ser.port)+'Close Success') else: # print "%s :Close Fail" % self.ser.port LoggingUtil.getDebugLogger().info( pyFileName, self.className, get_current_function_name(), str(self.ser.port)+'Close Fail') def __del__(self): if self.ser.is_open == True: self.ser.close() def parseResult(self, retStrArr): for retStr in retStrArr: if "ok;" in retStr.lower(): return True return False # 返回配置文件; def getConfigName(self): pass # 返回机型; def getDeviceName(self): cmd = 'report model;' return self.execmd(cmd) # 返回软件版本号,如V 1.0 def getDeviceSoft(self): cmd = 'report ver;' return self.execmd(cmd) # 做BUF测试,无误返回ok; def getStatus(self): cmd = 'dummy;' result = self.execmd(cmd) return self.parseResult(result) def setPattern(self, param): cmd = 'run ptn %s;' % param result = self.execmd(cmd) self.getStatus() return self.parseResult(result) def setTiming(self, param): cmd = 'run tim %s;' % param result = self.execmd(cmd) self.getStatus() return self.parseResult(result) def setTimingPattern(self, param1, param2): # timing cmd = 'run tim %s;' % param1 result1 = self.execmd(cmd) self.getStatus() time.sleep(0.5) # patten cmd = 'run ptn %s;' % param2 result2 = self.execmd(cmd) self.getStatus() # result; ret = self.parseResult(result1) and self.parseResult(result2) if ret == False: LoggingUtil.getDebugLogger().info( pyFileName, self.className, get_current_function_name(), 'setTimingPattern失败结果:'+str(result1)+str(result2)) return ret def setBlueOFF(self): cmd = 'b off;' result = self.execmd(cmd) return self.parseResult(result) def setBuleON(self): cmd = 'b on;' result = self.execmd(cmd) return self.parseResult(result) def setGreenOFF(self): cmd = 'g off;' result = self.execmd(cmd) return self.parseResult(result) def setGreenON(self): cmd = 'g on;' result = self.execmd(cmd) return self.parseResult(result) def setRedOFF(self): cmd = 'r off;' result = self.execmd(cmd) return self.parseResult(result) def setRedON(self): cmd = 'r on;' result = self.execmd(cmd) return self.parseResult(result) def setKeyBoardLock(self): cmd = 'kb lock on;' result = self.execmd(cmd) return self.parseResult(result) def setKeyBoardUnLock(self): cmd = 'kb lock off;' result = self.execmd(cmd) return self.parseResult(result) if __name__ == "__main__": pass ser = Chroma22293("COM6") if ser.open(): print ser.getDeviceName() #使用这个命令,导致其他命令无法正常使用; print ser.getDeviceSoft() print ser.getStatus() # print ser.setPattern(5) print ser.setTiming(15) print ser.setTimingPattern(2,12) print ser.setBlueOFF() print ser.setBuleON() print ser.setGreenOFF() print ser.setGreenON() print ser.setRedOFF() print ser.setRedON() print ser.setKeyBoardLock() print ser.setKeyBoardUnLock() ser.close() # listObj = {} # list = getChroma22293() # print list # for item in list: # obj = Chroma22293(item["port"]) # # obj.open() # if obj.open(): # listObj[item["name"]] = obj # print listObj # listObj["Google"].getDeviceName()