|
@@ -3,6 +3,7 @@ import gzip
|
|
|
from xmlrpc.client import Boolean
|
|
|
import serial
|
|
|
import time
|
|
|
+from enum import Enum
|
|
|
import binascii
|
|
|
from baseSerail import BaseSerial
|
|
|
|
|
@@ -48,6 +49,11 @@ RCode = {
|
|
|
"RC_LOSE": 0x0F, # 命令丢失(链路层出错);
|
|
|
}
|
|
|
|
|
|
+class CMDType(Enum):
|
|
|
+ Execute=0
|
|
|
+ Write=1
|
|
|
+ Read=2
|
|
|
+ Check=3
|
|
|
|
|
|
# 命令的封装与解析;
|
|
|
|
|
@@ -70,6 +76,8 @@ class MokaParse():
|
|
|
self.CRCL = 0xFF
|
|
|
# 是否是特殊命令;
|
|
|
self.FEFlag = False
|
|
|
+ # 命令类型;
|
|
|
+ self.cmdType = CMDType.Execute
|
|
|
# 是否是多参数命令;(同一个Command,有多个含义的参数)
|
|
|
self.isMultipleParams = False
|
|
|
# 正确执行后的结果值,bytearray格式的二维数组;
|
|
@@ -87,12 +95,13 @@ class MokaParse():
|
|
|
返回:
|
|
|
'''
|
|
|
|
|
|
- def parseCommand(self, head: int, command: list[int], subCommand: list[int] = [], data: bytearray = b'', isMultipleParams=False, FEFlag=False):
|
|
|
+ def parseCommand(self, cmdType: CMDType, head: int, command: list[int], subCommand: list[int] = [], data: bytearray = b'', isMultipleParams=False, FEFlag=False):
|
|
|
self.Header[0] = head
|
|
|
self.Command = command
|
|
|
self.SubCommand = subCommand
|
|
|
self.Data = data
|
|
|
self.FEFlag = FEFlag
|
|
|
+ self.cmdType = cmdType
|
|
|
self.isMultipleParams = isMultipleParams
|
|
|
if FEFlag:
|
|
|
self.Header.append(0xFE)
|
|
@@ -121,6 +130,73 @@ class MokaParse():
|
|
|
|
|
|
return package
|
|
|
|
|
|
+ '''校验第一段返回'''
|
|
|
+ def parseFirstSection(self, retCode: int, package: bytearray):
|
|
|
+ # 长度校验;
|
|
|
+ if package.__len__() != 5:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 再次验证包头是否有效;
|
|
|
+ if package[0] != retCode:
|
|
|
+ print('Incorrect package head!\n')
|
|
|
+ return False
|
|
|
+
|
|
|
+ if package[2] != RCode['RC_OK']: # 检测结果码(第三字节);
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检测crc是否正确;
|
|
|
+ crc = crc16(package, package.__len__() - 2)
|
|
|
+ CRCH = crc >> 8
|
|
|
+ CRCL = crc & 0xFF
|
|
|
+ if CRCH != package[-2] and CRCL != package[-1]:
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ '''校验第二段返回'''
|
|
|
+ def parseSecondSection(self, retCode: int, package: bytearray):
|
|
|
+ # 验证包头是否有效;
|
|
|
+ if package[0] != retCode:
|
|
|
+ print('Incorrect package head!\n')
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 长度有效性及标记位判断;
|
|
|
+ FEFlag = False
|
|
|
+ plen = package.__len__()
|
|
|
+ if plen > 255:
|
|
|
+ FEFlag = True
|
|
|
+ if package[1] != 0xFE:
|
|
|
+ return False
|
|
|
+ # 获取数据长度;
|
|
|
+ dlen = package[1] << 8 + package[2]
|
|
|
+ else:
|
|
|
+ dlen = package[1]
|
|
|
+
|
|
|
+ if dlen != plen:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检测crc是否正确;
|
|
|
+ crc = crc16(package, dlen - 2)
|
|
|
+ CRCH = crc >> 8
|
|
|
+ CRCL = crc & 0xFF
|
|
|
+ if CRCH != package[-2] and CRCL != package[-1]:
|
|
|
+ return False
|
|
|
+
|
|
|
+ if self.Command[0] == 0xFC:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ if FEFlag: #返回超过255
|
|
|
+ if package[3] - 1 != self.Command[0]:
|
|
|
+ return False
|
|
|
+ self.successData.append(package[5:-2])
|
|
|
+ else:
|
|
|
+ if package[2] - 1 != self.Command[0]:
|
|
|
+ return False
|
|
|
+ self.successData.append(package[3:-2])
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
'''
|
|
|
描述:解析读串口数据是否正确返回,若正确返回剥离出结果值;
|
|
|
参数:data 16进制的字符串;
|
|
@@ -143,54 +219,20 @@ class MokaParse():
|
|
|
elif self.Header[0] == PHeader['TV_Debug_Other']:
|
|
|
retCode = PHeader['TV_Other_Return']
|
|
|
|
|
|
- # 检测包引导码;
|
|
|
- if retCode != data[0]:
|
|
|
+ # 验证包头是否有效;
|
|
|
+ if data[0] != retCode:
|
|
|
+ print('Incorrect package head!\n')
|
|
|
return False
|
|
|
|
|
|
- package = []
|
|
|
- tooken_len = 0
|
|
|
- while True:
|
|
|
- if tooken_len >= data.__len__():
|
|
|
- break
|
|
|
- # 取出包长;
|
|
|
- if self.FEFlag:
|
|
|
- package_len = data[tooken_len + 1] << 8 + data[tooken_len + 2]
|
|
|
- else:
|
|
|
- package_len = data[tooken_len + 1]
|
|
|
- # 取出整包;
|
|
|
- package = data[tooken_len:tooken_len + package_len]
|
|
|
-
|
|
|
- # 再次验证包头是否有效;
|
|
|
- if package[0] != retCode:
|
|
|
- print('Incorrect package head!\n')
|
|
|
- return False
|
|
|
-
|
|
|
- # 检测crc是否正确;
|
|
|
- crc = crc16(package, package_len - 2)
|
|
|
- CRCH = crc >> 8
|
|
|
- CRCL = crc & 0xFF
|
|
|
- if CRCH != package[-2] and CRCL != package[-1]:
|
|
|
- return False
|
|
|
-
|
|
|
- # 第一次,检测结果码(第三字节);
|
|
|
- if tooken_len == 0:
|
|
|
- if package[2] != RCode['RC_OK']:
|
|
|
- return False
|
|
|
- else: # 第二次,检测命令码
|
|
|
- if self.Command[0] == 0xFC:
|
|
|
- pass
|
|
|
- else:
|
|
|
- if package[2] - 1 != self.Command[0]:
|
|
|
- return False
|
|
|
- if package[2] == 0xFE: #返回超过255
|
|
|
- self.successData.append(package[5:-2])
|
|
|
- else:
|
|
|
- self.successData.append(package[3:-2])
|
|
|
-
|
|
|
- tooken_len += package_len
|
|
|
+ if self.cmdType == CMDType.Execute or self.cmdType == CMDType.Write:
|
|
|
+ result = self.parseFirstSection(retCode,data)
|
|
|
+ elif self.cmdType == CMDType.Read or self.cmdType == CMDType.Check:
|
|
|
+ result = self.parseFirstSection(retCode,data[:5])
|
|
|
+ if data.__len__() > 5:
|
|
|
+ result = self.parseSecondSection(retCode,data[5:])
|
|
|
|
|
|
print('successData:', binascii.b2a_hex(bytearray(self.Command)), self.successData)
|
|
|
- return True
|
|
|
+ return result
|
|
|
|
|
|
|
|
|
class MokaSerial(BaseSerial):
|
|
@@ -216,9 +258,9 @@ class MokaSerial(BaseSerial):
|
|
|
return self.read()
|
|
|
|
|
|
'''协议模式发送命令'''
|
|
|
- def sendcmdEx(self, head: int, command: list[int], subCommand: list[int] = [], data: bytearray = b'', FEFlag: Boolean = False, returnParam: Boolean = False):
|
|
|
+ def sendcmdEx(self, cmdType: CMDType, head: int, command: list[int], subCommand: list[int] = [], data: bytearray = b'', FEFlag: Boolean = False, returnParam: Boolean = False):
|
|
|
cmd = MokaParse()
|
|
|
- package = cmd.parseCommand(head, command, subCommand, data, returnParam, FEFlag)
|
|
|
+ package = cmd.parseCommand(cmdType, head, command, subCommand, data, returnParam, FEFlag)
|
|
|
if self.write(package):
|
|
|
package = self.read()
|
|
|
return cmd.parseResult(package)
|
|
@@ -253,7 +295,7 @@ class MokaSerial(BaseSerial):
|
|
|
'''发送pattern'''
|
|
|
def send_parttern(self, rgb: list[int]):
|
|
|
cmd = MokaParse()
|
|
|
- package = cmd.parseCommand(0xAA, [0x28], [], bytearray(rgb))
|
|
|
+ package = cmd.parseCommand(CMDType.Execute, 0xAA, [0x28], [], bytearray(rgb))
|
|
|
if self.write(package):
|
|
|
package = self.read()
|
|
|
return cmd.parseResult(package)
|
|
@@ -265,7 +307,7 @@ class MokaSerial(BaseSerial):
|
|
|
fp = open(file_path, 'rb')
|
|
|
cmd = MokaParse()
|
|
|
data = fp.read()
|
|
|
- package = cmd.parseCommand(0xAA, [0xE9], [0x02], data, False, True)
|
|
|
+ package = cmd.parseCommand(CMDType.Execute, 0xAA, [0xE9], [0x02], data, False, True)
|
|
|
if self.write(package):
|
|
|
package = self.read()
|
|
|
return cmd.parseResult(package)
|
|
@@ -281,38 +323,38 @@ class MokaSerial(BaseSerial):
|
|
|
CRC_GM = [crc >> 8, crc & 0xFF]
|
|
|
|
|
|
cmd = MokaParse()
|
|
|
- package = cmd.parseCommand(0xAA, [0x99], [0x06], bytearray(CRC_GM))
|
|
|
+ package = cmd.parseCommand(CMDType.Execute, 0xAA, [0x99], [0x06], bytearray(CRC_GM))
|
|
|
if self.write(package):
|
|
|
package = self.read()
|
|
|
return cmd.parseResult(package)
|
|
|
|
|
|
'''进工厂模式'''
|
|
|
def enterFactory(self):
|
|
|
- return self.sendcmdEx(0xAA, [0x10], [0x01])
|
|
|
+ return self.sendcmdEx(CMDType.Execute, 0xAA, [0x10], [0x01])
|
|
|
|
|
|
'''白平衡初始化'''
|
|
|
def initWhiteBalance(self):
|
|
|
- return self.sendcmdEx(0xAA, [0x16], [0x01])
|
|
|
+ return self.sendcmdEx(CMDType.Execute, 0xAA, [0x16], [0x01])
|
|
|
|
|
|
'''关闭localdimming'''
|
|
|
def closeLocaldimming(self):
|
|
|
- return self.sendcmdEx(0xAA, [0x9F,0x07], [0x00])
|
|
|
+ return self.sendcmdEx(CMDType.Execute, 0xAA, [0x9F,0x07], [0x00])
|
|
|
|
|
|
'''打开内置pattern'''
|
|
|
def openBuiltInPattern(self):
|
|
|
- return self.sendcmdEx(0xAA, [0x27], [0x02])
|
|
|
+ return self.sendcmdEx(CMDType.Execute, 0xAA, [0x27], [0x02])
|
|
|
|
|
|
'''关闭内置pattern'''
|
|
|
def closeBuiltInPattern(self):
|
|
|
- return self.sendcmdEx(0xAA, [0x27], [0x00])
|
|
|
+ return self.sendcmdEx(CMDType.Execute, 0xAA, [0x27], [0x00])
|
|
|
|
|
|
'''切换标准色温'''
|
|
|
def switchStdColorTemperature(self):
|
|
|
- return self.sendcmdEx(0xAA, [0x31], [0x01])
|
|
|
+ return self.sendcmdEx(CMDType.Execute, 0xAA, [0x31], [0x01])
|
|
|
|
|
|
'''初始化gamma'''
|
|
|
def initGamma(self):
|
|
|
- return self.sendcmdEx(0xAA, [0x9F,0x09], [0x01])
|
|
|
+ return self.sendcmdEx(CMDType.Execute, 0xAA, [0x9F,0x09], [0x01])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
# 打开TV串口;
|
|
@@ -337,19 +379,24 @@ if __name__ == "__main__":
|
|
|
print("gamma初始化")
|
|
|
tv.initGamma()
|
|
|
|
|
|
- print("读取 mac")
|
|
|
- data = tv.sendcmdEx(0xAA, [0xBE], [0x00])
|
|
|
+ #print("读取 mac")
|
|
|
+ #data = tv.sendcmdEx(CMDType.Read, 0xAA, [0xBE], [0x00])
|
|
|
|
|
|
print("读取 wifimac")
|
|
|
- data = tv.sendcmdEx(0xAA, [0xBE], [0x15])
|
|
|
+ data = tv.sendcmdEx(CMDType.Read, 0xAA, [0xBE], [0x15])
|
|
|
|
|
|
print("读取 clientType")
|
|
|
- data = tv.sendcmdEx(0xAA, [0x8C], [0x00])
|
|
|
+ data = tv.sendcmdEx(CMDType.Read, 0xAA, [0x8C], [0x00])
|
|
|
|
|
|
for item in [[0,0,0],[26,26,26],[51,51,51],[76,76,76],[102,102,102],[127,127,127],[153,153,153],[178,178,178],[204,204,204],[229,229,229],[255,255,255],[0,0,255],[255,0,0]]:
|
|
|
if item.__len__() == 3:
|
|
|
tv.send_parttern([int(item[0]), int(item[1]), int(item[2])])
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
+ tv.gen_gzip('gamma.ini', 'gamma_cut.gzip')
|
|
|
+ tv.send_gamma('gamma_cut.gzip')
|
|
|
+
|
|
|
+ tv.send_gamma_active('gamma.ini')
|
|
|
+
|
|
|
print("关闭内置pattern")
|
|
|
tv.closeBuiltInPattern()
|