123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # -*- coding: utf-8 -*-
- import sys,os,time
- reload(sys)
- sys.setdefaultencoding('utf-8')
- import struct,binascii
- import threading
- from utils.serialboard_manager import *
- from tcl_factory_transfer import *
- #用于进行命令传输层的命令格式化
- class TCLFactoryCommand():
- Encoding = "ascii"
- Packet_Head_High = 0xAF
- """
- 其中:包引导码为命令的开始,用于自调设备发送使用:
- 0xAA --- 调试用命令代码引导码
- 0xAC --- 软件配屏参数调整命令代码引导码
- 0xAE --- **保留命令发送类型引导码**
- 用于待调试电视返回命令使用:
- 0xAB --- 调试用命令返回引导码
- 0xAD --- 软件配屏命令返回引导码
- 0xAF --- **保留命令返回类型引导码**
- """
- Head_TV_Debug = 0xAA
- Head_TV_Return = 0xAB
- Head_TV_Panel_Debug = 0xAC
- Head_TV_Panel_Return = 0xAD
- Head_TV_Debug_Other = 0xAE
- Head_TV_Other_Return = 0xAF
- FE_LEN_FLAG = 0xFE
- Packet_Head_Offset = 0
- Packet_Len_Offset = 1
- Packet_uCommand_Offset = 2
- Packet_CRC_Len = 2
- Packet_Head_Len = 1
- Packet_Len_Len = 1
- Packet_uCommand_Len = 1
- Packet_Min_Len = Packet_CRC_Len + Packet_Head_Len + Packet_Len_Len + Packet_uCommand_Len
- def __init__(self):
- self.Packet_Head = TCLFactoryCommand.Packet_Head_High & TCLFactoryCommand.Head_TV_Debug
- # 包长度为应用层数据字节数的总合+1(包引导码) +1(包长度) +2( CRC16 校验码) =应用层数据字节数总合+4 字节
- self.Packet_Length = [Packet_Min_Len]
- self.Packet_Command = 0x00
- self.CRC1 = 0xFF
- self.CRC2 = 0xFF
- self.Data = None
- #data必须是bytearray
- def setCommandData(self, command, data):
- self.Packet_Command = command
- self.Data = data
- if (self.Packet_Length[0] == TCLFactoryCommand.FE_LEN_FLAG):
- len = self.Data.__len__() + self.Packet_Length.__len__() + TCLFactoryCommand.Packet_Head_Len\
- +TCLFactoryCommand.Packet_uCommand_Len + TCLFactoryCommand.Packet_CRC_Len
- self.Packet_Length[1] = len >> 8
- self.Packet_Length[2] = len & 0xFF
- else:
- self.Packet_Length[0] = self.Packet_Length[0] + self.Data.__len__()
- #print "[TCLFactoryCommand:setCommandData],Packet_Length=",self.Packet_Length
- def setCommandHead(self, head):
- self.Packet_Head = TCLFactoryCommand.Packet_Head_High & head
- def changeFELenType(self):
- self.Packet_Length = [TCLFactoryCommand.FE_LEN_FLAG,0x00,0x00]
- def getCommandByteArray(self):
- bytePre = [self.Packet_Head]+ self.Packet_Length + [self.Packet_Command]
- byteCRC = [self.CRC1, self.CRC2]
- return bytearray(bytePre) + self.Data + bytearray(byteCRC)
- class SerialReadThread(threading.Thread):
- def __init__(self,threadID, counter, factory, name="SerialReadThread"):
- threading.Thread.__init__(self)
- self.threadID = threadID
- self.name = name
- self.counter = counter
- self.factory = factory
- self.transfer = factory.factoryTransfer
- def printThreadInfo(self):
- print "ThreadID:", self.threadID, ";Thread Name:", self.name
- def run(self):
- global Command_Status,Command_Result_Data,Command_Result
- print "\nRun SerialReadThread,Current status=",Command_Status
- while(Factory_Status == Factory_Status_Open):
- while(Command_Status == Status_Command_Return_Wait):
- result, line = self.transfer.readResultLine()
- self.factory.parseCommandResult(line)
- if (line.__len__() >= 5):
- Command_Status = Status_Command_Send
- print "[SerialReadThread:run],Command_Result=0x%x" % Command_Result
- print "\nExit SerialReadThread"
- Status_Init = 0x00
- Status_Command_Send = 0x01
- Status_Command_Return_Wait = 0x02
- Command_Status = Status_Init
- Command_Result_Return_Pass = 0x0A
- Command_Result_Return_Fail = 0x0E
- Command_Result = 0x00
- #有些指令的回馈结果,此时回馈的数据有两条指令
- #第一条是端长度的指令,告诉指令是否被成功执行。第二条回馈指令,返回想要获取的数据
- #例如查看projectID。ab050adf4e ab0785012ea08d
- Command_Result_Data = None
- Command_Return_Command = None
- Factory_Status_Open = 0x01
- Factory_Status_Close = 0x00
- Factory_Status = Factory_Status_Close
- ThreadID_SerialRead = 0x01
- Thread_Counter = 1
- Try_Count_Max = 3
- class TCLFactory():
- def __init__(self):
- global Thread_Counter
- self.Factory_Model_ON = False
- self.Command_Excute_Counter = 0
- self.initTransfer()
- self.openFactory()
- def openTCLFactroyModel(self):
- print "Open Factory model"
- self.sendCommandToTV(0x10,bytearray([0x01]))
- def closeTCLFactoryModel(self):
- print "Close Factory model"
- self.sendCommandToTV(0x10, bytearray([0x00]))
- def initTransfer(self):
- self.factoryTransfer = TCLFactoryTransfer()
- def runReadThread(self):
- global Thread_Counter, Command_Status
- try:
- print ""
- Command_Status = Status_Init
- self.serialReadThread = SerialReadThread(ThreadID_SerialRead, Thread_Counter, self)
- self.serialReadThread.printThreadInfo()
- self.serialReadThread.start()
- Thread_Counter += 1
- except:
- print "Error:Could not start SerialReadThread"
- def packCommandData(self, command, data, FELen = False):
- print "[TCLFactory:packCommandData]command=0x%x"%command ,";data=", data
- factoryCommand = TCLFactoryCommand()
- if (FELen == True):
- factoryCommand.changeFELenType()
- factoryCommand.setCommandData(command, data)
- return factoryCommand
- #line:是返回命令的整个字节数组
- def parseCommandResult(self, result):
- global Command_Result,Command_Result_Data
- print "parseCommandResult,result:", binascii.hexlify(result)
- if (result.__len__() < TCLFactoryCommand.Packet_Min_Len):
- return
- packet_head = result[Packet_Head_Offset]
- if (TCLFactoryCommand.Head_TV_Return == packet_head):
- result_len = result[Packet_Len_Offset]
- #获取反馈回来的数据,例如需要获取的ProjectID
- if (result.__len__() > TCLFactoryCommand.Packet_Min_Len*2) and (result_len == TCLFactoryCommand.Packet_Min_Len):
- Command_Result = result[Packet_uCommand_Offset]
- second_offset_begin = TCLFactoryCommand.Packet_Min_Len
- second_offset_end = result.__len__()-1
- self.parseCommandResult(result[second_offset_begin:second_offset_end])
- elif (result_len > TCLFactoryCommand.Packet_Min_Len):
- Command_Result = Command_Result_Return_Pass
- data_offset_begin = TCLFactoryCommand.Packet_uCommand_Offset+1
- data_offset_end = data_offset_begin + (result_len - TCLFactoryCommand.Packet_Min_Len)
- Command_Result_Data = result[data_offset_begin:data_offset_end]
- else:
- Command_Result = result[Packet_uCommand_Offset]
- else:
- Command_Result = Command_Result_Return_Fail
- """
- 发送指令后,进入结果等待。出现3次错误回馈,返回Fail。反之,表示命令成功。
- """
- def sendCommandToTV(self, command, data, FELen = False):
- global Command_Status,Command_Result
- Command_Result = 0x00
- Command_Status = Status_Command_Return_Wait
- self.Command_Excute_Counter += 1
- print "Begin to wait to read.Command_Status=0x%x"%Command_Status, "; apicounter=", Command_Excute_Counter
- time.sleep(1)
- flag = FELen
- factoryCommand = self.packCommandData(command, data, FELen = flag)
- factoryCommand.setCommandHead(TCLFactoryCommand.Head_TV_Debug)
- commandByteArr = factoryCommand.getCommandByteArray()
- self.factoryTransfer.sendCommandToTV(commandByteArr)
- if (Command_Result == Command_Result_Return_Pass):
- print "Command Excute Pass"
- Command_Status = Status_Command_Send
- self.Command_Excute_Counter = 0
- return True
- if (Command_Result == Command_Result_Return_Fail):
- if (self.Command_Excute_Counter == Try_Count_Max):
- print "Command Excute Fail"
- Command_Status = Status_Command_Send
- self.Command_Excute_Counter = 0
- return False
- else:
- return self.sendCommandToTV(command,data)
- #没有回馈结果的情况
- Command_Status = Status_Command_Send
- self.Command_Excute_Counter = 0
- print "End to wait to read.Command_Status=", Command_Status, "; counter=", Command_Excute_Counter
- return False
- #返回前一条指令的回馈报文结果:ucCommand,ucData。
- def readCommandResult(self):
- print "[TCLFactory:readCommandResult]"
- if (Command_Result_Data <> None):
- print "[TCLFactory:readCommandResult],Command_Result=0x%x"%Command_Result, ";Command_Result_Data:", binascii.hexlify(Command_Result_Data)
- if (Command_Result == Command_Result_Return_Pass):
- result = "Pass"
- else:
- result = "Fail"
- return result, Command_Result_Data
- def closeFactory(self):
- global Factory_Status
- Factory_Status = Factory_Status_Close
- def openFactory(self):
- global Factory_Status
- Factory_Status = Factory_Status_Open
- self.runReadThread()
- if __name__ == "__main__":
- factory = TCLFactory()
- factory.openTCLFactroyModel()
- #factory.sendCommandToTV(0x25,0x02)
- factory.closeFactory()
|