testWizardClient.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. # -*- coding:utf-8 -*-
  2. import os, sys, time
  3. import abc
  4. import socket
  5. import json
  6. import numpy as np
  7. import struct
  8. import TestWizardBLK
  9. import binascii
  10. # 创建ProHeader数据类型
  11. ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']})
  12. class TestWizardClient():
  13. def __init__(self):
  14. '''设备通讯超时值:毫秒'''
  15. self.device_timeout = 300
  16. '''通信sock'''
  17. self.sock = None
  18. '''通信端口号'''
  19. self.port = 5566
  20. '''通信状态'''
  21. self.constate = False
  22. '''连接服务器'''
  23. self.connect()
  24. def __def__(self):
  25. self.disconnect()
  26. '''连接服务器'''
  27. def connect(self):
  28. try:
  29. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  30. self.sock.connect(('127.0.0.1', self.port)) # 没有返回值;
  31. self.sock.sendall('111')
  32. self.constate = True
  33. except Exception, e:
  34. print "TestWizardClient=> socket connect error:", e, self.port
  35. self.constate = False
  36. return self.constate
  37. # 发送消息;
  38. def __sendCmd(self, cmdType, command):
  39. '''是否连接'''
  40. if self.constate is False:
  41. if self.connect() is False:
  42. return False
  43. '''发送请求数据'''
  44. try:
  45. self.sock.settimeout(3)
  46. command = cmdType + ">" + command
  47. phead = np.array([(0xAC, command.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)
  48. '''一次性发送完整包'''
  49. self.sock.sendall(bytearray(phead.tobytes() + bytes(command)))
  50. self.sock.settimeout(None)
  51. except Exception, e:
  52. print "__send " + cmdType + "=> send Error:", e
  53. import traceback
  54. traceback.print_exc()
  55. self.disconnect()
  56. self.connect()
  57. return False
  58. try:
  59. self.sock.settimeout(2)
  60. recvBytes = bytearray(self.sock.recv(1024))
  61. data = self.parseRetData(recvBytes, phead.itemsize)
  62. if int(data) == 1:
  63. print "Info:__send %s: %s success!"%(cmdType, command)
  64. return True
  65. print "Error:__send %s: %s fail!"%(cmdType, command)
  66. except Exception, e:
  67. print "__send " + cmdType + "=> recv Error:", e
  68. return False
  69. def parseRetData(self, recvByteArray, headSize):
  70. version = recvByteArray[0]&0xff
  71. recvLen = self.byte4ToInt(recvByteArray, 1)
  72. if version == 0xAC and recvLen == recvByteArray.__len__():
  73. byteArr = bytes(recvByteArray)
  74. data = byteArr[headSize: recvByteArray.__len__()]
  75. return data
  76. return "0"
  77. def byte4ToInt(self,bytearray, start):
  78. i = (bytearray[start] & 0xff) \
  79. | (bytearray[start+1] & 0xff) << 8 \
  80. | (bytearray[start+2] & 0xff) << 16\
  81. | (bytearray[start+3] & 0xff) << 24
  82. return i
  83. # 发送消息;
  84. def sendKey(self, keyname, times=1, duration=1):
  85. while times > 0:
  86. times -= 1
  87. self.__sendkey(keyname)
  88. time.sleep(duration)
  89. # 发送多个按键;
  90. def sendKeys(self, keyNames, duration):
  91. for key in keyNames:
  92. self.__sendCmd("ir", key)
  93. time.sleep(duration)
  94. def sendBLEKey(self, bleKeyName):
  95. if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()):
  96. cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()]
  97. ret = self.__sendCmd("cmd", cmdStr)
  98. time.sleep(1)
  99. return ret
  100. else:
  101. return False
  102. '''
  103. 电源开关,所有电源同时控制,不区分LAN口
  104. :param onOff: ON代表开,OFF代表关,区分大小写。
  105. :return True代表成功,False达标失败
  106. '''
  107. def setAllPower(self, onOff):
  108. if onOff <> "ON" and onOff <> "OFF":
  109. return False
  110. cmdStr = "*SET " + onOff
  111. ret = self.__sendCmd("cmd", cmdStr)
  112. time.sleep(1)
  113. return ret
  114. def sendUsbSwitch(self, index):
  115. if index < 0 or index > 1:
  116. print u"usb 索引只能0、1。当前=", index
  117. return False
  118. return self.__sendCmd('usb', str(index))
  119. def loadSignalDataSet(self, file):
  120. pass
  121. def addSignalDataSet(self, file):
  122. pass
  123. def getDeviceName(self):
  124. pass
  125. def getScriptName(self):
  126. pass
  127. def getKeyName(self):
  128. pass
  129. def close(self):
  130. self.disconnect()
  131. def getCurrentDeviceName(self):
  132. pass
  133. '''断开连接'''
  134. def disconnect(self):
  135. try:
  136. self.sock.shutdown(2)
  137. self.sock.close()
  138. except Exception, e:
  139. print "TestWizardClient=> socket disconnect error:", e
  140. if __name__ == "__main__":
  141. tw = TestWizardClient()
  142. tw.sendUsbSwitch(0)
  143. time.sleep(2)
  144. tw.sendUsbSwitch(1)
  145. # tw.sendKey('POWER')
  146. # tw.sendKey('down')
  147. # print 'sleep'
  148. # time.sleep(3)
  149. # tw.sendKey('up')
  150. # print 111111
  151. # print tw.sendBLEKey("uparrow")
  152. # time.sleep(3)
  153. # print tw.sendBLEKey("downarrow")
  154. # time.sleep(3)
  155. # print 2222222
  156. # print tw.setAllPower("OFF")
  157. # print tw.setAllPower("ON")