123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- # -*- coding:utf-8 -*-
- import os, sys, time
- import abc
- import socket
- import json
- import numpy as np
- import struct
- import TestWizardBLK
- import binascii
- # 创建ProHeader数据类型
- ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']})
- class TestWizardClient():
- def __init__(self):
- '''设备通讯超时值:毫秒'''
- self.device_timeout = 300
- '''通信sock'''
- self.sock = None
- '''通信端口号'''
- self.port = 5566
- '''通信状态'''
- self.constate = False
- '''连接服务器'''
- self.connect()
- def __def__(self):
- self.disconnect()
- '''连接服务器'''
- def connect(self):
- try:
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect(('127.0.0.1', self.port)) # 没有返回值;
- self.sock.sendall('111')
- self.constate = True
- except Exception, e:
- print "TestWizardClient=> socket connect error:", e, self.port
- self.constate = False
- return self.constate
- # 发送消息;
- def __sendkey(self, keyname):
- '''是否连接'''
- if self.constate is False:
- if self.connect() is False:
- return None
- '''发送请求数据'''
- try:
- self.sock.settimeout(3)
- keyname = "ir>"+keyname
- phead = np.array([(0xAC, keyname.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)
- '''一次性发送完整包'''
- self.sock.sendall(bytearray(phead.tobytes() + bytes(keyname)))
- '''分包发送:头、体'''
- # self.sock.sendall(phead.tobytes())
- # self.sock.sendall(bytes(keyname))
- self.sock.settimeout(None)
- # return False
- except Exception, e:
- print "__sendkey=> send Error:", e
- import traceback
- traceback.print_exc()
- self.disconnect()
- self.connect()
- return None
- try:
- self.sock.settimeout(2)
- recvBytes = bytearray(self.sock.recv(1024))
- data = self.parseRetData(recvBytes, phead.itemsize)
- if int(data) == 1:
- print "Info:__sendkey %s success!"%keyname
- return True
- print "Error:__sendkey %s fail!"%keyname
- except Exception, e:
- print "__sendkey=> recv Error:", e
- return None
- def parseRetData(self, recvByteArray, headSize):
- version = recvByteArray[0]&0xff
- recvLen = self.byte4ToInt(recvByteArray, 1)
- if version == 0xAC and recvLen == recvByteArray.__len__():
- byteArr = bytes(recvByteArray)
- data = byteArr[headSize: recvByteArray.__len__()]
- return data
- return "0"
- def byte4ToInt(self,bytearray, start):
- i = (bytearray[start] & 0xff) \
- | (bytearray[start+1] & 0xff) << 8 \
- | (bytearray[start+2] & 0xff) << 16\
- | (bytearray[start+3] & 0xff) << 24
- return i
- # 发送消息;
- def sendKey(self, keyname, times=1, duration=1):
- while times > 0:
- times -= 1
- self.__sendkey(keyname)
- time.sleep(duration)
-
- # 发送多个按键;
- def sendKeys(self, keyNames, duration):
- for key in keyNames:
- self.__sendkey(key)
- time.sleep(duration)
- '''
- 用于发送测试精灵指令,不区分子模块,也不获取发送结果,默认指令发送成功。
- 因为各子模块,有不返回结果,而返回结果的,结果格式也不一样。
- '''
- def __sendCmd(self, cmdStr):
- '''是否连接'''
- if self.constate is False:
- if self.connect() is False:
- return False
- '''发送请求数据'''
- try:
- self.sock.settimeout(3)
- cmdStr = "cmd>"+cmdStr
- phead = np.array([(0xAC, cmdStr.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)
- '''一次性发送完整包'''
- ecmd = bytearray(phead.tobytes() + bytes(cmdStr))
- self.sock.sendall(ecmd)
- except Exception, e:
- print "__sendCmd=> send Error:", e
- import traceback
- traceback.print_exc()
- self.disconnect()
- self.connect()
- return False
- try:
- self.sock.settimeout(10)
- recvBytes = bytearray(self.sock.recv(1024))
- data = self.parseRetData(recvBytes, phead.itemsize)
- if int(data) == 1:
- print "Info:__sendkey success!"
- return True
- print "Error:__sendkey fail!"
- except Exception, e:
- print "__sendCmd=> recv Error:", e
- return False
- def sendBLEKey(self, bleKeyName):
- if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()):
- cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()]
- ret = self.__sendCmd(cmdStr)
- time.sleep(1)
- return ret
- else:
- return False
- '''
- 电源开关,所有电源同时控制,不区分LAN口
- :param onOff: ON代表开,OFF代表关,区分大小写。
- :return True代表成功,False达标失败
- '''
- def setAllPower(self, onOff):
- if onOff <> "ON" and onOff <> "OFF":
- return False
- cmdStr = "*SET " + onOff
- ret = self.__sendCmd(cmdStr)
- time.sleep(1)
- return ret
- def loadSignalDataSet(self, file):
- pass
- def addSignalDataSet(self, file):
- pass
- def getDeviceName(self):
- pass
- def getScriptName(self):
- pass
- def getKeyName(self):
- pass
- def close(self):
- self.disconnect()
- def getCurrentDeviceName(self):
- pass
- '''断开连接'''
- def disconnect(self):
- try:
- self.sock.shutdown(2)
- self.sock.close()
- except Exception, e:
- print "TestWizardClient=> socket disconnect error:", e
- if __name__ == "__main__":
- tw = TestWizardClient()
- # tw.sendKey('POWER')
- tw.sendKey('down')
- # print 'sleep'
- # time.sleep(3)
- tw.sendKey('up')
- # print 111111
- # print tw.sendBLEKey("uparrow")
- # time.sleep(3)
- # print tw.sendBLEKey("downarrow")
- # time.sleep(3)
- # print 2222222
- # print tw.setAllPower("OFF")
- # print tw.setAllPower("ON")
|