testWizardClient.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. # -*- coding:utf-8 -*-
  2. import os, sys, time
  3. import abc
  4. import socket
  5. import json
  6. import numpy as np
  7. import struct
  8. import TestWizardBLK
  9. import binascii
  10. # 创建ProHeader数据类型
  11. ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']})
  12. class TestWizardClient():
  13. def __init__(self):
  14. '''设备通讯超时值:毫秒'''
  15. self.device_timeout = 300
  16. '''通信sock'''
  17. self.sock = None
  18. '''通信端口号'''
  19. self.port = 5566
  20. '''通信状态'''
  21. self.constate = False
  22. '''连接服务器'''
  23. self.connect()
  24. def __def__(self):
  25. self.disconnect()
  26. '''连接服务器'''
  27. def connect(self):
  28. try:
  29. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  30. self.sock.connect(('127.0.0.1', self.port)) # 没有返回值;
  31. self.sock.sendall('111')
  32. self.constate = True
  33. except Exception, e:
  34. print "TestWizardClient=> socket connect error:", e, self.port
  35. self.constate = False
  36. return self.constate
  37. # 发送消息;
  38. def __sendkey(self, keyname):
  39. '''是否连接'''
  40. if self.constate is False:
  41. if self.connect() is False:
  42. return None
  43. '''发送请求数据'''
  44. try:
  45. self.sock.settimeout(3)
  46. keyname = "ir>"+keyname
  47. phead = np.array([(0xAC, keyname.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)
  48. '''一次性发送完整包'''
  49. self.sock.sendall(bytearray(phead.tobytes() + bytes(keyname)))
  50. '''分包发送:头、体'''
  51. # self.sock.sendall(phead.tobytes())
  52. # self.sock.sendall(bytes(keyname))
  53. self.sock.settimeout(None)
  54. # return False
  55. except Exception, e:
  56. print "__sendkey=> send Error:", e
  57. import traceback
  58. traceback.print_exc()
  59. self.disconnect()
  60. self.connect()
  61. return None
  62. try:
  63. self.sock.settimeout(2)
  64. recvBytes = bytearray(self.sock.recv(1024))
  65. data = self.parseRetData(recvBytes, phead.itemsize)
  66. if int(data) == 1:
  67. print "Info:__sendkey %s success!"%keyname
  68. return True
  69. print "Error:__sendkey %s fail!"%keyname
  70. except Exception, e:
  71. print "__sendkey=> recv Error:", e
  72. return None
  73. def parseRetData(self, recvByteArray, headSize):
  74. version = recvByteArray[0]&0xff
  75. recvLen = self.byte4ToInt(recvByteArray, 1)
  76. if version == 0xAC and recvLen == recvByteArray.__len__():
  77. byteArr = bytes(recvByteArray)
  78. data = byteArr[headSize: recvByteArray.__len__()]
  79. return data
  80. return "0"
  81. def byte4ToInt(self,bytearray, start):
  82. i = (bytearray[start] & 0xff) \
  83. | (bytearray[start+1] & 0xff) << 8 \
  84. | (bytearray[start+2] & 0xff) << 16\
  85. | (bytearray[start+3] & 0xff) << 24
  86. return i
  87. # 发送消息;
  88. def sendKey(self, keyname, times=1, duration=1):
  89. while times > 0:
  90. times -= 1
  91. self.__sendkey(keyname)
  92. time.sleep(duration)
  93. # 发送多个按键;
  94. def sendKeys(self, keyNames, duration):
  95. for key in keyNames:
  96. self.__sendkey(key)
  97. time.sleep(duration)
  98. '''
  99. 用于发送测试精灵指令,不区分子模块,也不获取发送结果,默认指令发送成功。
  100. 因为各子模块,有不返回结果,而返回结果的,结果格式也不一样。
  101. '''
  102. def __sendCmd(self, cmdStr):
  103. '''是否连接'''
  104. if self.constate is False:
  105. if self.connect() is False:
  106. return False
  107. '''发送请求数据'''
  108. try:
  109. self.sock.settimeout(3)
  110. cmdStr = "cmd>"+cmdStr
  111. phead = np.array([(0xAC, cmdStr.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)
  112. '''一次性发送完整包'''
  113. ecmd = bytearray(phead.tobytes() + bytes(cmdStr))
  114. self.sock.sendall(ecmd)
  115. except Exception, e:
  116. print "__sendCmd=> send Error:", e
  117. import traceback
  118. traceback.print_exc()
  119. self.disconnect()
  120. self.connect()
  121. return False
  122. try:
  123. self.sock.settimeout(10)
  124. recvBytes = bytearray(self.sock.recv(1024))
  125. data = self.parseRetData(recvBytes, phead.itemsize)
  126. if int(data) == 1:
  127. print "Info:__sendkey success!"
  128. return True
  129. print "Error:__sendkey fail!"
  130. except Exception, e:
  131. print "__sendCmd=> recv Error:", e
  132. return False
  133. def sendBLEKey(self, bleKeyName):
  134. if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()):
  135. cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()]
  136. ret = self.__sendCmd(cmdStr)
  137. time.sleep(1)
  138. return ret
  139. else:
  140. return False
  141. '''
  142. 电源开关,所有电源同时控制,不区分LAN口
  143. :param onOff: ON代表开,OFF代表关,区分大小写。
  144. :return True代表成功,False达标失败
  145. '''
  146. def setAllPower(self, onOff):
  147. if onOff <> "ON" and onOff <> "OFF":
  148. return False
  149. cmdStr = "*SET " + onOff
  150. ret = self.__sendCmd(cmdStr)
  151. time.sleep(1)
  152. return ret
  153. def loadSignalDataSet(self, file):
  154. pass
  155. def addSignalDataSet(self, file):
  156. pass
  157. def getDeviceName(self):
  158. pass
  159. def getScriptName(self):
  160. pass
  161. def getKeyName(self):
  162. pass
  163. def close(self):
  164. self.disconnect()
  165. def getCurrentDeviceName(self):
  166. pass
  167. '''断开连接'''
  168. def disconnect(self):
  169. try:
  170. self.sock.shutdown(2)
  171. self.sock.close()
  172. except Exception, e:
  173. print "TestWizardClient=> socket disconnect error:", e
  174. if __name__ == "__main__":
  175. tw = TestWizardClient()
  176. # tw.sendKey('POWER')
  177. tw.sendKey('down')
  178. # print 'sleep'
  179. # time.sleep(3)
  180. tw.sendKey('up')
  181. # print 111111
  182. # print tw.sendBLEKey("uparrow")
  183. # time.sleep(3)
  184. # print tw.sendBLEKey("downarrow")
  185. # time.sleep(3)
  186. # print 2222222
  187. # print tw.setAllPower("OFF")
  188. # print tw.setAllPower("ON")