testWizardClient.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. # -*- coding:utf-8 -*-
  2. import os, sys, time
  3. import abc
  4. import socket
  5. import json
  6. import numpy as np
  7. import struct
  8. import TestWizardBLK
  9. import binascii
  10. # 创建ProHeader数据类型
  11. ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']})
  12. class TestWizardClient():
  13. def __init__(self):
  14. '''设备通讯超时值:毫秒'''
  15. self.device_timeout = 300
  16. '''通信sock'''
  17. self.sock = None
  18. '''通信端口号'''
  19. self.port = 5566
  20. '''通信状态'''
  21. self.constate = False
  22. '''连接服务器'''
  23. self.connect()
  24. def __def__(self):
  25. self.disconnect()
  26. '''连接服务器'''
  27. def connect(self):
  28. try:
  29. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  30. self.sock.connect(('127.0.0.1', self.port)) # 没有返回值;
  31. self.sock.sendall('111')
  32. self.constate = True
  33. except Exception, e:
  34. print "TestWizardClient=> socket connect error:", e, self.port
  35. self.constate = False
  36. return self.constate
  37. # 发送消息;
  38. def __sendCmd(self, cmdType, command):
  39. '''是否连接'''
  40. if self.constate is False:
  41. if self.connect() is False:
  42. return False
  43. '''发送请求数据'''
  44. try:
  45. self.sock.settimeout(3)
  46. command = cmdType + ">" + command
  47. phead = np.array([(0xAC, command.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)
  48. '''一次性发送完整包'''
  49. self.sock.sendall(bytearray(phead.tobytes() + bytes(command)))
  50. self.sock.settimeout(None)
  51. except Exception, e:
  52. print "__send " + cmdType + "=> send Error:", e
  53. import traceback
  54. traceback.print_exc()
  55. self.disconnect()
  56. self.connect()
  57. return False
  58. try:
  59. self.sock.settimeout(2)
  60. recvBytes = bytearray(self.sock.recv(1024))
  61. data = self.parseRetData(recvBytes, phead.itemsize)
  62. if int(data) == 1:
  63. print "Info:__send %s: %s success!" % (cmdType, command)
  64. return True
  65. print "Error:__send %s: %s fail!" % (cmdType, command)
  66. except Exception, e:
  67. print "__send " + cmdType + "=> recv Error:", e
  68. return False
  69. def parseRetData(self, recvByteArray, headSize):
  70. version = recvByteArray[0] & 0xff
  71. recvLen = self.byte4ToInt(recvByteArray, 1)
  72. if version == 0xAC and recvLen == recvByteArray.__len__():
  73. byteArr = bytes(recvByteArray)
  74. data = byteArr[headSize: recvByteArray.__len__()]
  75. return data
  76. return "0"
  77. def byte4ToInt(self, bytearray, start):
  78. i = (bytearray[start] & 0xff) \
  79. | (bytearray[start + 1] & 0xff) << 8 \
  80. | (bytearray[start + 2] & 0xff) << 16 \
  81. | (bytearray[start + 3] & 0xff) << 24
  82. return i
  83. # 发送消息;
  84. def sendKey(self, keyname, times=1, duration=1):
  85. while times > 0:
  86. times -= 1
  87. self.__sendCmd("ir", keyname)
  88. time.sleep(duration)
  89. # 发送多个按键;
  90. def sendKeys(self, keyNames, duration):
  91. for key in keyNames:
  92. self.__sendCmd("ir", key)
  93. time.sleep(duration)
  94. def sendBLEKey(self, bleKeyName):
  95. if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()):
  96. cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()]
  97. ret = self.__sendCmd("cmd", cmdStr)
  98. # time.sleep(1)
  99. return ret
  100. else:
  101. return False
  102. '''
  103. 函数:电源开关,如果lanNum=-1设置所有LAN口;
  104. 参数:
  105. onOff:字符串,只能是ON或OFF
  106. lanNum:整型或None
  107. 返回:设置成功返回True
  108. '''
  109. def setPower(self, onOff, lanNum=-1):
  110. cmdStr = None
  111. if onOff <> "ON" and onOff <> "OFF":
  112. return False
  113. if isinstance(lanNum, int) == False:
  114. return False
  115. if lanNum <= -1: # 认为负数都是全控制;
  116. cmdStr = "*SET POWER " + onOff
  117. elif lanNum > 0: # LAN0不是电源控制,排除掉;
  118. cmdStr = "*SET LAN %d POWER %s" % (lanNum, onOff)
  119. if cmdStr is None:
  120. return False
  121. ret = self.__sendCmd("cmd", cmdStr)
  122. # time.sleep(1)
  123. return ret
  124. def sendUsbSwitch(self, index):
  125. if index < 0 or index > 1:
  126. print u"usb 索引只能0、1。当前=", index
  127. return False
  128. return self.__sendCmd('usb', str(index))
  129. def loadSignalDataSet(self, file):
  130. pass
  131. def addSignalDataSet(self, file):
  132. pass
  133. def getDeviceName(self):
  134. pass
  135. def getScriptName(self):
  136. pass
  137. def getKeyName(self):
  138. pass
  139. def close(self):
  140. self.disconnect()
  141. def getCurrentDeviceName(self):
  142. pass
  143. '''断开连接'''
  144. def disconnect(self):
  145. try:
  146. self.sock.shutdown(2)
  147. self.sock.close()
  148. except Exception, e:
  149. print "TestWizardClient=> socket disconnect error:", e
  150. # 单例模块;
  151. tw_singleton = TestWizardClient()
  152. print u"tw_singleton地址", tw_singleton
  153. def API_UsbSwitch(index):
  154. return tw_singleton.sendUsbSwitch(index)
  155. def API_SetPower(onOff, lanNum):
  156. return tw_singleton.setPower(onOff, lanNum)
  157. if __name__ == "__main__":
  158. API_SetPower("ON", -1)
  159. time.sleep(3)
  160. API_SetPower("OFF", -1)
  161. time.sleep(3)
  162. API_SetPower("ON", 3)
  163. time.sleep(3)
  164. API_SetPower("OFF", 3)
  165. # 切换usb;
  166. time.sleep(3)
  167. API_UsbSwitch(0)
  168. time.sleep(10)
  169. API_UsbSwitch(1)
  170. # 发送红外;
  171. tw_singleton.sendKey("setting")