# -*- coding: utf-8 -*- import sys,os,time reload(sys) sys.setdefaultencoding('utf-8') import struct,binascii import threading from utils.serialboard_manager import * from tcl_factory_transfer import * #用于进行命令传输层的命令格式化 class TCLFactoryCommand(): Encoding = "ascii" Packet_Head_High = 0xAF """ 其中:包引导码为命令的开始,用于自调设备发送使用: 0xAA --- 调试用命令代码引导码 0xAC --- 软件配屏参数调整命令代码引导码 0xAE --- **保留命令发送类型引导码** 用于待调试电视返回命令使用: 0xAB --- 调试用命令返回引导码 0xAD --- 软件配屏命令返回引导码 0xAF --- **保留命令返回类型引导码** """ Head_TV_Debug = 0xAA Head_TV_Return = 0xAB Head_TV_Panel_Debug = 0xAC Head_TV_Panel_Return = 0xAD Head_TV_Debug_Other = 0xAE Head_TV_Other_Return = 0xAF FE_LEN_FLAG = 0xFE Packet_Head_Offset = 0 Packet_Len_Offset = 1 Packet_uCommand_Offset = 2 Packet_CRC_Len = 2 Packet_Head_Len = 1 Packet_Len_Len = 1 Packet_uCommand_Len = 1 Packet_Min_Len = Packet_CRC_Len + Packet_Head_Len + Packet_Len_Len + Packet_uCommand_Len def __init__(self): self.Packet_Head = TCLFactoryCommand.Packet_Head_High & TCLFactoryCommand.Head_TV_Debug # 包长度为应用层数据字节数的总合+1(包引导码) +1(包长度) +2( CRC16 校验码) =应用层数据字节数总合+4 字节 self.Packet_Length = [Packet_Min_Len] self.Packet_Command = 0x00 self.CRC1 = 0xFF self.CRC2 = 0xFF self.Data = None #data必须是bytearray def setCommandData(self, command, data): self.Packet_Command = command self.Data = data if (self.Packet_Length[0] == TCLFactoryCommand.FE_LEN_FLAG): len = self.Data.__len__() + self.Packet_Length.__len__() + TCLFactoryCommand.Packet_Head_Len\ +TCLFactoryCommand.Packet_uCommand_Len + TCLFactoryCommand.Packet_CRC_Len self.Packet_Length[1] = len >> 8 self.Packet_Length[2] = len & 0xFF else: self.Packet_Length[0] = self.Packet_Length[0] + self.Data.__len__() #print "[TCLFactoryCommand:setCommandData],Packet_Length=",self.Packet_Length def setCommandHead(self, head): self.Packet_Head = TCLFactoryCommand.Packet_Head_High & head def changeFELenType(self): self.Packet_Length = [TCLFactoryCommand.FE_LEN_FLAG,0x00,0x00] def getCommandByteArray(self): bytePre = [self.Packet_Head]+ self.Packet_Length + [self.Packet_Command] byteCRC = [self.CRC1, self.CRC2] return bytearray(bytePre) + self.Data + bytearray(byteCRC) class SerialReadThread(threading.Thread): def __init__(self,threadID, counter, factory, name="SerialReadThread"): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter self.factory = factory self.transfer = factory.factoryTransfer def printThreadInfo(self): print "ThreadID:", self.threadID, ";Thread Name:", self.name def run(self): global Command_Status,Command_Result_Data,Command_Result print "\nRun SerialReadThread,Current status=",Command_Status while(Factory_Status == Factory_Status_Open): while(Command_Status == Status_Command_Return_Wait): result, line = self.transfer.readResultLine() self.factory.parseCommandResult(line) if (line.__len__() >= 5): Command_Status = Status_Command_Send print "[SerialReadThread:run],Command_Result=0x%x" % Command_Result print "\nExit SerialReadThread" Status_Init = 0x00 Status_Command_Send = 0x01 Status_Command_Return_Wait = 0x02 Command_Status = Status_Init Command_Result_Return_Pass = 0x0A Command_Result_Return_Fail = 0x0E Command_Result = 0x00 #有些指令的回馈结果,此时回馈的数据有两条指令 #第一条是端长度的指令,告诉指令是否被成功执行。第二条回馈指令,返回想要获取的数据 #例如查看projectID。ab050adf4e ab0785012ea08d Command_Result_Data = None Command_Return_Command = None Factory_Status_Open = 0x01 Factory_Status_Close = 0x00 Factory_Status = Factory_Status_Close ThreadID_SerialRead = 0x01 Thread_Counter = 1 Try_Count_Max = 3 class TCLFactory(): def __init__(self): global Thread_Counter self.Factory_Model_ON = False self.Command_Excute_Counter = 0 self.initTransfer() self.openFactory() def openTCLFactroyModel(self): print "Open Factory model" self.sendCommandToTV(0x10,bytearray([0x01])) def closeTCLFactoryModel(self): print "Close Factory model" self.sendCommandToTV(0x10, bytearray([0x00])) def initTransfer(self): self.factoryTransfer = TCLFactoryTransfer() def runReadThread(self): global Thread_Counter, Command_Status try: print "" Command_Status = Status_Init self.serialReadThread = SerialReadThread(ThreadID_SerialRead, Thread_Counter, self) self.serialReadThread.printThreadInfo() self.serialReadThread.start() Thread_Counter += 1 except: print "Error:Could not start SerialReadThread" def packCommandData(self, command, data, FELen = False): print "[TCLFactory:packCommandData]command=0x%x"%command ,";data=", data factoryCommand = TCLFactoryCommand() if (FELen == True): factoryCommand.changeFELenType() factoryCommand.setCommandData(command, data) return factoryCommand #line:是返回命令的整个字节数组 def parseCommandResult(self, result): global Command_Result,Command_Result_Data print "parseCommandResult,result:", binascii.hexlify(result) if (result.__len__() < TCLFactoryCommand.Packet_Min_Len): return packet_head = result[Packet_Head_Offset] if (TCLFactoryCommand.Head_TV_Return == packet_head): result_len = result[Packet_Len_Offset] #获取反馈回来的数据,例如需要获取的ProjectID if (result.__len__() > TCLFactoryCommand.Packet_Min_Len*2) and (result_len == TCLFactoryCommand.Packet_Min_Len): Command_Result = result[Packet_uCommand_Offset] second_offset_begin = TCLFactoryCommand.Packet_Min_Len second_offset_end = result.__len__()-1 self.parseCommandResult(result[second_offset_begin:second_offset_end]) elif (result_len > TCLFactoryCommand.Packet_Min_Len): Command_Result = Command_Result_Return_Pass data_offset_begin = TCLFactoryCommand.Packet_uCommand_Offset+1 data_offset_end = data_offset_begin + (result_len - TCLFactoryCommand.Packet_Min_Len) Command_Result_Data = result[data_offset_begin:data_offset_end] else: Command_Result = result[Packet_uCommand_Offset] else: Command_Result = Command_Result_Return_Fail """ 发送指令后,进入结果等待。出现3次错误回馈,返回Fail。反之,表示命令成功。 """ def sendCommandToTV(self, command, data, FELen = False): global Command_Status,Command_Result Command_Result = 0x00 Command_Status = Status_Command_Return_Wait self.Command_Excute_Counter += 1 print "Begin to wait to read.Command_Status=0x%x"%Command_Status, "; apicounter=", Command_Excute_Counter time.sleep(1) flag = FELen factoryCommand = self.packCommandData(command, data, FELen = flag) factoryCommand.setCommandHead(TCLFactoryCommand.Head_TV_Debug) commandByteArr = factoryCommand.getCommandByteArray() self.factoryTransfer.sendCommandToTV(commandByteArr) if (Command_Result == Command_Result_Return_Pass): print "Command Excute Pass" Command_Status = Status_Command_Send self.Command_Excute_Counter = 0 return True if (Command_Result == Command_Result_Return_Fail): if (self.Command_Excute_Counter == Try_Count_Max): print "Command Excute Fail" Command_Status = Status_Command_Send self.Command_Excute_Counter = 0 return False else: return self.sendCommandToTV(command,data) #没有回馈结果的情况 Command_Status = Status_Command_Send self.Command_Excute_Counter = 0 print "End to wait to read.Command_Status=", Command_Status, "; counter=", Command_Excute_Counter return False #返回前一条指令的回馈报文结果:ucCommand,ucData。 def readCommandResult(self): print "[TCLFactory:readCommandResult]" if (Command_Result_Data <> None): print "[TCLFactory:readCommandResult],Command_Result=0x%x"%Command_Result, ";Command_Result_Data:", binascii.hexlify(Command_Result_Data) if (Command_Result == Command_Result_Return_Pass): result = "Pass" else: result = "Fail" return result, Command_Result_Data def closeFactory(self): global Factory_Status Factory_Status = Factory_Status_Close def openFactory(self): global Factory_Status Factory_Status = Factory_Status_Open self.runReadThread() if __name__ == "__main__": factory = TCLFactory() factory.openTCLFactroyModel() #factory.sendCommandToTV(0x25,0x02) factory.closeFactory()