# -*- coding:utf-8 -*- import os, sys, time import abc import socket import json import numpy as np import struct import TestWizardBLK import binascii # 创建ProHeader数据类型 ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']}) class TestWizardClient(): def __init__(self): '''设备通讯超时值:毫秒''' self.device_timeout = 300 '''通信sock''' self.sock = None '''通信端口号''' self.port = 5566 '''通信状态''' self.constate = False '''连接服务器''' self.connect() def __def__(self): self.disconnect() '''连接服务器''' def connect(self): try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(('127.0.0.1', self.port)) # 没有返回值; self.sock.sendall('111') self.constate = True except Exception, e: print "TestWizardClient=> socket connect error:", e, self.port self.constate = False return self.constate # 发送消息; def __sendkey(self, keyname): '''是否连接''' if self.constate is False: if self.connect() is False: return None '''发送请求数据''' try: self.sock.settimeout(3) keyname = "ir>"+keyname phead = np.array([(0xAC, keyname.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead) '''一次性发送完整包''' self.sock.sendall(bytearray(phead.tobytes() + bytes(keyname))) '''分包发送:头、体''' # self.sock.sendall(phead.tobytes()) # self.sock.sendall(bytes(keyname)) self.sock.settimeout(None) # return False except Exception, e: print "__sendkey=> send Error:", e import traceback traceback.print_exc() self.disconnect() self.connect() return None try: self.sock.settimeout(2) recvBytes = bytearray(self.sock.recv(1024)) data = self.parseRetData(recvBytes, phead.itemsize) if int(data) == 1: print "Info:__sendkey %s success!"%keyname return True print "Error:__sendkey %s fail!"%keyname except Exception, e: print "__sendkey=> recv Error:", e return None def parseRetData(self, recvByteArray, headSize): version = recvByteArray[0]&0xff recvLen = self.byte4ToInt(recvByteArray, 1) if version == 0xAC and recvLen == recvByteArray.__len__(): byteArr = bytes(recvByteArray) data = byteArr[headSize: recvByteArray.__len__()] return data return "0" def byte4ToInt(self,bytearray, start): i = (bytearray[start] & 0xff) \ | (bytearray[start+1] & 0xff) << 8 \ | (bytearray[start+2] & 0xff) << 16\ | (bytearray[start+3] & 0xff) << 24 return i # 发送消息; def sendKey(self, keyname, times=1, duration=1): while times > 0: times -= 1 self.__sendkey(keyname) time.sleep(duration) # 发送多个按键; def sendKeys(self, keyNames, duration): for key in keyNames: self.__sendkey(key) time.sleep(duration) ''' 用于发送测试精灵指令,不区分子模块,也不获取发送结果,默认指令发送成功。 因为各子模块,有不返回结果,而返回结果的,结果格式也不一样。 ''' def __sendCmd(self, cmdStr): '''是否连接''' if self.constate is False: if self.connect() is False: return False '''发送请求数据''' try: self.sock.settimeout(3) cmdStr = "cmd>"+cmdStr phead = np.array([(0xAC, cmdStr.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead) '''一次性发送完整包''' ecmd = bytearray(phead.tobytes() + bytes(cmdStr)) self.sock.sendall(ecmd) except Exception, e: print "__sendCmd=> send Error:", e import traceback traceback.print_exc() self.disconnect() self.connect() return False try: self.sock.settimeout(10) recvBytes = bytearray(self.sock.recv(1024)) data = self.parseRetData(recvBytes, phead.itemsize) if int(data) == 1: print "Info:__sendkey success!" return True print "Error:__sendkey fail!" except Exception, e: print "__sendCmd=> recv Error:", e return False def sendBLEKey(self, bleKeyName): if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()): cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()] ret = self.__sendCmd(cmdStr) time.sleep(1) return ret else: return False ''' 电源开关,所有电源同时控制,不区分LAN口 :param onOff: ON代表开,OFF代表关,区分大小写。 :return True代表成功,False达标失败 ''' def setAllPower(self, onOff): if onOff <> "ON" and onOff <> "OFF": return False cmdStr = "*SET " + onOff ret = self.__sendCmd(cmdStr) time.sleep(1) return ret def loadSignalDataSet(self, file): pass def addSignalDataSet(self, file): pass def getDeviceName(self): pass def getScriptName(self): pass def getKeyName(self): pass def close(self): self.disconnect() def getCurrentDeviceName(self): pass '''断开连接''' def disconnect(self): try: self.sock.shutdown(2) self.sock.close() except Exception, e: print "TestWizardClient=> socket disconnect error:", e if __name__ == "__main__": tw = TestWizardClient() # tw.sendKey('POWER') tw.sendKey('down') # print 'sleep' # time.sleep(3) tw.sendKey('up') # print 111111 # print tw.sendBLEKey("uparrow") # time.sleep(3) # print tw.sendBLEKey("downarrow") # time.sleep(3) # print 2222222 # print tw.setAllPower("OFF") # print tw.setAllPower("ON")