123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- # -*- coding: UTF-8 -*-
- import serial
- import time
- from ssat_sdk.sat_environment import *
- from ssat_sdk.utils import LoggingUtil
- import inspect
- from ssat_sdk.utils.AbnormalClient import abnormal_client
- pyFileName = os.path.split(__file__)[-1]
- def get_current_function_name():
- return inspect.stack()[1][3]
- class Chroma22293():
- # 构造函数
- def __init__(self, port):
- self.loop = 0
- self.timeout = 0.2
- self.ser = serial.Serial()
- self.ser.port = port
- self.ser.baudrate = 115200
- self.ser.bytesize = 8
- self.ser.parity = serial.PARITY_NONE
- self.ser.stopbits = 1
- self.ser.timeout = 1 # 读超时;
- self.ser.writeTimeout = 0.5 # 写超时;
- self.className = self.__class__.__name__
- # 析构函数;
- def __del__(self):
- self.close()
-
- def sendAbnormal(self, msg):
- curport = 'COM'+str(self.ser.port)
- dic = getChroma22293()
- if dic['devices'][0]['port'] == curport:
- abnormal_client.sendAbnormal('signalcard', msg)
- else:
- abnormal_client.sendAbnormal('signalcard_2', msg)
- def open(self):
- try:
- self.ser.open()
- if self.ser.is_open == True:
- # print "%s : Open Success" % self.ser.port
- LoggingUtil.getDebugLogger().info(
- pyFileName,
- self.className,
- get_current_function_name(),
- str(self.ser.port)+'Open Success')
- print self.ser.get_settings()
- print "rts=" + str(self.ser.rts)
- print "cts=" + str(self.ser.cts)
- else:
- # print "%s : Open Fail" % self.ser.port
- LoggingUtil.getDebugLogger().info(
- pyFileName,
- self.className,
- get_current_function_name(),
- str(self.ser.port)+'Open Fail')
- # 返回结果;
- return self.ser.is_open
- except Exception, e:
- # print u'打开串口失败,描述:', e
- LoggingUtil.getDebugLogger().info(
- pyFileName,
- self.className,
- get_current_function_name(),
- '打开串口失败,描述:'+str(e))
-
- self.sendAbnormal(str(e))
- return False
- def writeCmd(self, cmdstr):
- try:
- len = self.ser.write(cmdstr + b"\r") # 注意后面要回车键(\r),命令才会发送出去执行
- print "Sending cmd:" + cmdstr + " len=" + str(len)
- return True
- except Exception, e:
- self.sendAbnormal(str(e))
- # print u'写串口失败,描述:', e
- LoggingUtil.getDebugLogger().info(
- pyFileName,
- self.className,
- get_current_function_name(),
- '写串口失败,描述:'+str(e))
- #重新打开串口;
- self.close()
- self.open()
- return False
- def readResult(self):
- """
- If the result is normal, "OK;.." will be output.
- OK;[CR][LF] 正常
- If the result is an error, the following message will be output.
- NG: SYNTAX ERRO;[CR][LF] 指令语法错误
- NG: BOUNDARY ERROR;[CR][LF] 输入参数超出范围
- NG: EXECUTE ERROR;[CR][LF] 系统执行指令时发生错误
- """
- try:
- self.loop = 0
- time.sleep(self.timeout)
- while self.ser.in_waiting == 0:
- time.sleep(self.timeout)
- self.loop += 1
- if self.loop == 10:
- break
- result = self.ser.readlines()
- print "cmd result:" + str(result)
- if len(result) == 0:
- result = ['ng1: read none', 'ng2: read none']
- return result
- except Exception, e:
- self.sendAbnormal(str(e))
- # print u'读串口失败,错误描述:', e
- LoggingUtil.getDebugLogger().info(
- pyFileName,
- self.className,
- get_current_function_name(),
- '读串口失败,错误描述:'+str(e))
- #重新打开串口;
- self.close()
- self.open()
- return ['ng1: SerialException', 'ng2: SerialException']
- def execmd(self, cmdstr):
- if self.writeCmd(cmdstr):
- return self.readResult()
- else:
- return ['ng1: SerialException', 'ng2: SerialException']
- def close(self):
- self.ser.close()
- if self.ser.is_open == False:
- # print "%s :Close Success" % self.ser.port
- LoggingUtil.getDebugLogger().info(
- pyFileName,
- self.className,
- get_current_function_name(),
- str(self.ser.port)+'Close Success')
- else:
- # print "%s :Close Fail" % self.ser.port
- LoggingUtil.getDebugLogger().info(
- pyFileName,
- self.className,
- get_current_function_name(),
- str(self.ser.port)+'Close Fail')
- def __del__(self):
- if self.ser.is_open == True:
- self.ser.close()
- def parseResult(self, retStrArr):
- for retStr in retStrArr:
- if "ok;" in retStr.lower():
- return True
- return False
- # 返回配置文件;
- def getConfigName(self):
- pass
- # 返回机型;
- def getDeviceName(self):
- cmd = 'report model;'
- return self.execmd(cmd)
- # 返回软件版本号,如V 1.0
- def getDeviceSoft(self):
- cmd = 'report ver;'
- return self.execmd(cmd)
- # 做BUF测试,无误返回ok;
- def getStatus(self):
- cmd = 'dummy;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- def setPattern(self, param):
- cmd = 'run ptn %s;' % param
- result = self.execmd(cmd)
- self.getStatus()
- return self.parseResult(result)
- def setTiming(self, param):
- cmd = 'run tim %s;' % param
- result = self.execmd(cmd)
- self.getStatus()
- return self.parseResult(result)
- def setTimingPattern(self, param1, param2):
- # timing
- cmd = 'run tim %s;' % param1
- result1 = self.execmd(cmd)
- self.getStatus()
- time.sleep(0.5)
- # patten
- cmd = 'run ptn %s;' % param2
- result2 = self.execmd(cmd)
- self.getStatus()
- # result;
- ret = self.parseResult(result1) and self.parseResult(result2)
- if ret == False:
- LoggingUtil.getDebugLogger().info(
- pyFileName,
- self.className,
- get_current_function_name(),
- 'setTimingPattern失败结果:'+str(result1)+str(result2))
- return ret
- def setBlueOFF(self):
- cmd = 'b off;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- def setBuleON(self):
- cmd = 'b on;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- def setGreenOFF(self):
- cmd = 'g off;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- def setGreenON(self):
- cmd = 'g on;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- def setRedOFF(self):
- cmd = 'r off;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- def setRedON(self):
- cmd = 'r on;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- def setKeyBoardLock(self):
- cmd = 'kb lock on;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- def setKeyBoardUnLock(self):
- cmd = 'kb lock off;'
- result = self.execmd(cmd)
- return self.parseResult(result)
- if __name__ == "__main__":
- pass
- ser = Chroma22293("COM6")
- if ser.open():
- print ser.getDeviceName() #使用这个命令,导致其他命令无法正常使用;
- print ser.getDeviceSoft()
- print ser.getStatus()
- #
- print ser.setPattern(5)
- print ser.setTiming(15)
- print ser.setTimingPattern(2,12)
- print ser.setBlueOFF()
- print ser.setBuleON()
- print ser.setGreenOFF()
- print ser.setGreenON()
- print ser.setRedOFF()
- print ser.setRedON()
- print ser.setKeyBoardLock()
- print ser.setKeyBoardUnLock()
- ser.close()
- # listObj = {}
- # list = getChroma22293()
- # print list
- # for item in list:
- # obj = Chroma22293(item["port"])
- # # obj.open()
- # if obj.open():
- # listObj[item["name"]] = obj
- # print listObj
- # listObj["Google"].getDeviceName()
|