# -*- coding:utf-8 -*- import os, sys, time import abc import socket import json import numpy as np import struct import TestWizardBLK import binascii # 创建ProHeader数据类型 ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']}) class TestWizardClient(): def __init__(self): '''设备通讯超时值:毫秒''' self.device_timeout = 300 '''通信sock''' self.sock = None '''通信端口号''' self.port = 5566 '''通信状态''' self.constate = False '''连接服务器''' self.connect() def __def__(self): self.disconnect() '''连接服务器''' def connect(self): try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(('127.0.0.1', self.port)) # 没有返回值; self.sock.sendall('111') self.constate = True except Exception, e: print "TestWizardClient=> socket connect error:", e, self.port self.constate = False return self.constate # 发送消息; def __sendCmd(self, cmdType, command): '''是否连接''' if self.constate is False: if self.connect() is False: return False '''发送请求数据''' try: self.sock.settimeout(3) command = cmdType + ">" + command phead = np.array([(0xAC, command.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead) '''一次性发送完整包''' self.sock.sendall(bytearray(phead.tobytes() + bytes(command))) self.sock.settimeout(None) except Exception, e: print "__send " + cmdType + "=> send Error:", e import traceback traceback.print_exc() self.disconnect() self.connect() return False try: self.sock.settimeout(2) recvBytes = bytearray(self.sock.recv(1024)) data = self.parseRetData(recvBytes, phead.itemsize) if int(data) == 1: print "Info:__send %s: %s success!"%(cmdType, command) return True print "Error:__send %s: %s fail!"%(cmdType, command) except Exception, e: print "__send " + cmdType + "=> recv Error:", e return False def parseRetData(self, recvByteArray, headSize): version = recvByteArray[0]&0xff recvLen = self.byte4ToInt(recvByteArray, 1) if version == 0xAC and recvLen == recvByteArray.__len__(): byteArr = bytes(recvByteArray) data = byteArr[headSize: recvByteArray.__len__()] return data return "0" def byte4ToInt(self,bytearray, start): i = (bytearray[start] & 0xff) \ | (bytearray[start+1] & 0xff) << 8 \ | (bytearray[start+2] & 0xff) << 16\ | (bytearray[start+3] & 0xff) << 24 return i # 发送消息; def sendKey(self, keyname, times=1, duration=1): while times > 0: times -= 1 self.__sendkey(keyname) time.sleep(duration) # 发送多个按键; def sendKeys(self, keyNames, duration): for key in keyNames: self.__sendCmd("ir", key) time.sleep(duration) def sendBLEKey(self, bleKeyName): if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()): cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()] ret = self.__sendCmd("cmd", cmdStr) time.sleep(1) return ret else: return False ''' 电源开关,所有电源同时控制,不区分LAN口 :param onOff: ON代表开,OFF代表关,区分大小写。 :return True代表成功,False达标失败 ''' def setAllPower(self, onOff): if onOff <> "ON" and onOff <> "OFF": return False cmdStr = "*SET " + onOff ret = self.__sendCmd("cmd", cmdStr) time.sleep(1) return ret def sendUsbSwitch(self, index): if index < 0 or index > 1: print u"usb 索引只能0、1。当前=", index return False return self.__sendCmd('usb', str(index)) def loadSignalDataSet(self, file): pass def addSignalDataSet(self, file): pass def getDeviceName(self): pass def getScriptName(self): pass def getKeyName(self): pass def close(self): self.disconnect() def getCurrentDeviceName(self): pass '''断开连接''' def disconnect(self): try: self.sock.shutdown(2) self.sock.close() except Exception, e: print "TestWizardClient=> socket disconnect error:", e if __name__ == "__main__": tw = TestWizardClient() tw.sendUsbSwitch(0) time.sleep(2) tw.sendUsbSwitch(1) # tw.sendKey('POWER') # tw.sendKey('down') # print 'sleep' # time.sleep(3) # tw.sendKey('up') # print 111111 # print tw.sendBLEKey("uparrow") # time.sleep(3) # print tw.sendBLEKey("downarrow") # time.sleep(3) # print 2222222 # print tw.setAllPower("OFF") # print tw.setAllPower("ON")